storage: add DeleteMultiple method
DeleteMultiple will allow metainfo to delete multiple segments and get the old pointers in a single request. Change-Id: Ic144f30c5453274fa2b80df2895f123f5a9cc48b
This commit is contained in:
parent
f4667426b5
commit
da5e408afe
@ -153,6 +153,34 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
|
||||
var items storage.Items
|
||||
err = client.update(func(bucket *bolt.Bucket) error {
|
||||
for _, key := range keys {
|
||||
value := bucket.Get(key)
|
||||
if len(value) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
items = append(items, storage.ListItem{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
|
||||
err := bucket.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return items, err
|
||||
}
|
||||
|
||||
// List returns either a list of keys for which boltdb has values or an error.
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -164,6 +164,38 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
|
||||
rows, err := client.db.QueryContext(ctx, `
|
||||
DELETE FROM pathdata
|
||||
WHERE fullpath = any($1::BYTEA[])
|
||||
RETURNING fullpath, metadata`,
|
||||
pq.ByteaArray(storage.Keys(keys).ByteSlices()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, rows.Close())
|
||||
}()
|
||||
|
||||
var items storage.Items
|
||||
for rows.Next() {
|
||||
var key, value []byte
|
||||
err := rows.Scan(&key, &value)
|
||||
if err != nil {
|
||||
return items, err
|
||||
}
|
||||
items = append(items, storage.ListItem{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
return items, rows.Err()
|
||||
}
|
||||
|
||||
// List returns either a list of known keys, in order, or an error.
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -67,6 +67,8 @@ type KeyValueStore interface {
|
||||
GetAll(context.Context, Keys) (Values, error)
|
||||
// Delete deletes key and the value
|
||||
Delete(context.Context, Key) error
|
||||
// DeleteMultiple deletes keys and returns nil for
|
||||
DeleteMultiple(context.Context, []Key) (Items, error)
|
||||
// List lists all keys starting from start and upto limit items
|
||||
List(ctx context.Context, start Key, limit int) (Keys, error)
|
||||
// Iterate iterates over items based on opts
|
||||
|
@ -161,6 +161,38 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
|
||||
rows, err := client.db.QueryContext(ctx, `
|
||||
DELETE FROM pathdata
|
||||
WHERE fullpath = any($1::BYTEA[])
|
||||
RETURNING fullpath, metadata`,
|
||||
pq.ByteaArray(storage.Keys(keys).ByteSlices()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, rows.Close())
|
||||
}()
|
||||
|
||||
var items storage.Items
|
||||
for rows.Next() {
|
||||
var key, value []byte
|
||||
err := rows.Scan(&key, &value)
|
||||
if err != nil {
|
||||
return items, err
|
||||
}
|
||||
items = append(items, storage.ListItem{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
return items, rows.Err()
|
||||
}
|
||||
|
||||
// List returns either a list of known keys, in order, or an error.
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -127,6 +127,12 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
return delete(ctx, client.db, key)
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
return deleteMultiple(ctx, client.db, keys)
|
||||
}
|
||||
|
||||
// Close closes a redis client
|
||||
func (client *Client) Close() error {
|
||||
return client.db.Close()
|
||||
@ -314,3 +320,32 @@ func delete(ctx context.Context, cmdable redis.Cmdable, key storage.Key) (err er
|
||||
}
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
|
||||
func deleteMultiple(ctx context.Context, cmdable redis.Cmdable, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
|
||||
var items storage.Items
|
||||
for _, key := range keys {
|
||||
value, err := get(ctx, cmdable, key)
|
||||
if err != nil {
|
||||
if errs.Is(err, redis.Nil) || storage.ErrKeyNotFound.Has(err) {
|
||||
continue
|
||||
}
|
||||
return items, err
|
||||
}
|
||||
|
||||
err = delete(ctx, cmdable, key)
|
||||
if err != nil {
|
||||
if errs.Is(err, redis.Nil) || storage.ErrKeyNotFound.Has(err) {
|
||||
continue
|
||||
}
|
||||
return items, err
|
||||
}
|
||||
items = append(items, storage.ListItem{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
@ -62,6 +62,13 @@ func (store *Logger) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
return store.store.Delete(ctx, key)
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (store *Logger) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
store.log.Debug("DeleteMultiple", zap.Any("keys", keys))
|
||||
return store.store.DeleteMultiple(ctx, keys)
|
||||
}
|
||||
|
||||
// List lists all keys starting from first and upto limit items
|
||||
func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -175,6 +175,35 @@ func (store *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMultiple deletes keys ignoring missing keys
|
||||
func (store *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) {
|
||||
defer mon.Task()(&ctx, len(keys))(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.version++
|
||||
store.CallCount.Delete++
|
||||
|
||||
if store.forcedError() {
|
||||
return nil, errInternal
|
||||
}
|
||||
|
||||
var items storage.Items
|
||||
for _, key := range keys {
|
||||
keyIndex, found := store.indexOf(key)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
e := store.Items[keyIndex]
|
||||
items = append(items, storage.ListItem{
|
||||
Key: e.Key,
|
||||
Value: e.Value,
|
||||
})
|
||||
store.delete(keyIndex)
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// List lists all keys starting from start and upto limit items
|
||||
func (store *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -6,8 +6,11 @@ package testsuite
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -90,7 +93,50 @@ func testCRUD(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStor
|
||||
})
|
||||
|
||||
t.Run("Delete", func(t *testing.T) {
|
||||
for _, item := range items {
|
||||
k := len(items) / 2
|
||||
batch, nonbatch := items[:k], items[k:]
|
||||
|
||||
var list []storage.Key
|
||||
for _, item := range batch {
|
||||
list = append(list, item.Key)
|
||||
}
|
||||
|
||||
var expected storage.Items
|
||||
for _, item := range batch {
|
||||
value, err := store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %v: %v", item.Key, value)
|
||||
}
|
||||
expected = append(expected, storage.ListItem{
|
||||
Key: item.Key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
deleted, err := store.DeleteMultiple(ctx, list)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to batch delete: %v", err)
|
||||
}
|
||||
|
||||
sort.Slice(expected, func(i, k int) bool {
|
||||
return expected[i].Key.Less(expected[k].Key)
|
||||
})
|
||||
sort.Slice(deleted, func(i, k int) bool {
|
||||
return deleted[i].Key.Less(deleted[k].Key)
|
||||
})
|
||||
require.Equal(t, expected, deleted)
|
||||
|
||||
// Duplicate delete should also be fine.
|
||||
retry, err := store.DeleteMultiple(ctx, list)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to batch delete: %v", err)
|
||||
}
|
||||
if len(retry) != 0 {
|
||||
t.Fatalf("expected delete to return nothing: %v", len(retry))
|
||||
}
|
||||
|
||||
// individual deletes
|
||||
for _, item := range nonbatch {
|
||||
err := store.Delete(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete %v: %v", item.Key, err)
|
||||
|
Loading…
Reference in New Issue
Block a user