storage: add monkit task to missing places (#2122)
* storage: add monkit task to missing places Change-Id: I9e17a6b14f7c25bbf698eeecf32785e9add3f26e * fix tests Change-Id: Id078276fa3de61a28eb3d01d4e751732ecbb173f * import order Change-Id: I814e33755b9f10b5219af37cd828cd75eb3da1a4 * remove part of other commit Change-Id: Idaa4c95cd65e97567fb466de49718db8203cfbe1
This commit is contained in:
parent
68c8ce63a7
commit
f1641af802
@ -69,14 +69,14 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
|
||||
|
||||
// get a remote segment from pointerdb
|
||||
pdb := satellite.Metainfo.Service
|
||||
listResponse, _, err := pdb.List("", "", "", true, 0, 0)
|
||||
listResponse, _, err := pdb.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = pdb.Get(path)
|
||||
pointer, err = pdb.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
@ -242,14 +242,14 @@ func TestDownloadFromUnresponsiveNode(t *testing.T) {
|
||||
|
||||
// get a remote segment from pointerdb
|
||||
pdb := planet.Satellites[0].Metainfo.Service
|
||||
listResponse, _, err := pdb.List("", "", "", true, 0, 0)
|
||||
listResponse, _, err := pdb.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = pdb.Get(path)
|
||||
pointer, err = pdb.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
|
@ -117,10 +117,10 @@ func (t *Service) CalculateAtRestData(ctx context.Context) (latestTally time.Tim
|
||||
var bucketCount int64
|
||||
var totalTallies, currentBucketTally accounting.BucketTally
|
||||
|
||||
err = t.metainfo.Iterate("", "", true, false,
|
||||
func(it storage.Iterator) error {
|
||||
err = t.metainfo.Iterate(ctx, "", "", true, false,
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
|
||||
pointer := &pb.Pointer{}
|
||||
err = proto.Unmarshal(item.Value, pointer)
|
||||
|
@ -52,7 +52,7 @@ func (cursor *Cursor) NextStripe(ctx context.Context) (stripe *Stripe, more bool
|
||||
var pointerItems []*pb.ListResponse_Item
|
||||
var path storj.Path
|
||||
|
||||
pointerItems, more, err = cursor.metainfo.List("", cursor.lastPath, "", true, 0, meta.None)
|
||||
pointerItems, more, err = cursor.metainfo.List(ctx, "", cursor.lastPath, "", true, 0, meta.None)
|
||||
if err != nil {
|
||||
return nil, more, err
|
||||
}
|
||||
@ -117,7 +117,7 @@ func (cursor *Cursor) getRandomValidPointer(ctx context.Context, pointerItems []
|
||||
path := pointerItem.Path
|
||||
|
||||
// get pointer info
|
||||
pointer, err := cursor.metainfo.Get(path)
|
||||
pointer, err := cursor.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
errGroup.Add(err)
|
||||
continue
|
||||
@ -131,7 +131,7 @@ func (cursor *Cursor) getRandomValidPointer(ctx context.Context, pointerItems []
|
||||
continue
|
||||
}
|
||||
if t.Before(time.Now()) {
|
||||
err := cursor.metainfo.Delete(path)
|
||||
err := cursor.metainfo.Delete(ctx, path)
|
||||
if err != nil {
|
||||
errGroup.Add(err)
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package audit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"math"
|
||||
"math/big"
|
||||
@ -55,7 +56,7 @@ func TestAuditSegment(t *testing.T) {
|
||||
|
||||
// test to see how random paths are
|
||||
t.Run("probabilisticTest", func(t *testing.T) {
|
||||
list, _, err := metainfo.List("", "", "", true, 10, meta.None)
|
||||
list, _, err := metainfo.List(ctx, "", "", "", true, 10, meta.None)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 10)
|
||||
|
||||
@ -119,7 +120,7 @@ func TestDeleteExpired(t *testing.T) {
|
||||
//populate metainfo with 10 expired pointers of test data
|
||||
_, cursor, metainfo := populateTestData(t, planet, ×tamp.Timestamp{})
|
||||
//make sure it they're in there
|
||||
list, _, err := metainfo.List("", "", "", true, 10, meta.None)
|
||||
list, _, err := metainfo.List(ctx, "", "", "", true, 10, meta.None)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 10)
|
||||
// make sure an error and no pointer is returned
|
||||
@ -129,7 +130,7 @@ func TestDeleteExpired(t *testing.T) {
|
||||
require.Nil(t, stripe)
|
||||
})
|
||||
//make sure it they're not in there anymore
|
||||
list, _, err = metainfo.List("", "", "", true, 10, meta.None)
|
||||
list, _, err = metainfo.List(ctx, "", "", "", true, 10, meta.None)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, list, 0)
|
||||
})
|
||||
@ -141,6 +142,7 @@ type testData struct {
|
||||
}
|
||||
|
||||
func populateTestData(t *testing.T, planet *testplanet.Planet, expiration *timestamp.Timestamp) ([]testData, *audit.Cursor, *metainfo.Service) {
|
||||
ctx := context.TODO()
|
||||
tests := []testData{
|
||||
{bm: "success-1", path: "folder1/file1"},
|
||||
{bm: "success-2", path: "foodFolder1/file1/file2"},
|
||||
@ -162,7 +164,7 @@ func populateTestData(t *testing.T, planet *testplanet.Planet, expiration *times
|
||||
test := tt
|
||||
t.Run(test.bm, func(t *testing.T) {
|
||||
pointer := makePointer(test.path, expiration)
|
||||
require.NoError(t, metainfo.Put(test.path, pointer))
|
||||
require.NoError(t, metainfo.Put(ctx, test.path, pointer))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
@ -303,7 +303,7 @@ func (authDB *AuthorizationDB) Create(ctx context.Context, userID string, count
|
||||
// Get retrieves authorizations by user ID.
|
||||
func (authDB *AuthorizationDB) Get(ctx context.Context, userID string) (_ Authorizations, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
authsBytes, err := authDB.DB.Get(storage.Key(userID))
|
||||
authsBytes, err := authDB.DB.Get(ctx, storage.Key(userID))
|
||||
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, ErrAuthorizationDB.Wrap(err)
|
||||
}
|
||||
@ -321,11 +321,11 @@ func (authDB *AuthorizationDB) Get(ctx context.Context, userID string) (_ Author
|
||||
// UserIDs returns a list of all userIDs present in the authorization database.
|
||||
func (authDB *AuthorizationDB) UserIDs(ctx context.Context) (userIDs []string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = authDB.DB.Iterate(storage.IterateOptions{
|
||||
err = authDB.DB.Iterate(ctx, storage.IterateOptions{
|
||||
Recurse: true,
|
||||
}, func(iterator storage.Iterator) error {
|
||||
}, func(ctx context.Context, iterator storage.Iterator) error {
|
||||
var listItem storage.ListItem
|
||||
for iterator.Next(&listItem) {
|
||||
for iterator.Next(ctx, &listItem) {
|
||||
userIDs = append(userIDs, listItem.Key.String())
|
||||
}
|
||||
return nil
|
||||
@ -336,12 +336,12 @@ func (authDB *AuthorizationDB) UserIDs(ctx context.Context) (userIDs []string, e
|
||||
// List returns all authorizations in the database.
|
||||
func (authDB *AuthorizationDB) List(ctx context.Context) (auths Authorizations, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = authDB.DB.Iterate(storage.IterateOptions{
|
||||
err = authDB.DB.Iterate(ctx, storage.IterateOptions{
|
||||
Recurse: true,
|
||||
}, func(iterator storage.Iterator) error {
|
||||
}, func(ctx context.Context, iterator storage.Iterator) error {
|
||||
var listErrs errs.Group
|
||||
var listItem storage.ListItem
|
||||
for iterator.Next(&listItem) {
|
||||
for iterator.Next(ctx, &listItem) {
|
||||
var nextAuths Authorizations
|
||||
if err := nextAuths.Unmarshal(listItem.Value); err != nil {
|
||||
listErrs.Add(err)
|
||||
@ -450,7 +450,7 @@ func (authDB *AuthorizationDB) put(ctx context.Context, userID string, auths Aut
|
||||
return ErrAuthorizationDB.Wrap(err)
|
||||
}
|
||||
|
||||
if err := authDB.DB.Put(storage.Key(userID), authsBytes); err != nil {
|
||||
if err := authDB.DB.Put(ctx, storage.Key(userID), authsBytes); err != nil {
|
||||
return ErrAuthorizationDB.Wrap(err)
|
||||
}
|
||||
return nil
|
||||
|
@ -107,10 +107,10 @@ func TestAuthorizationDB_Create(t *testing.T) {
|
||||
emailKey := storage.Key(testCase.email)
|
||||
|
||||
if testCase.startCount == 0 {
|
||||
_, err = authDB.DB.Get(emailKey)
|
||||
_, err = authDB.DB.Get(ctx, emailKey)
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
v, err := authDB.DB.Get(emailKey)
|
||||
v, err := authDB.DB.Get(ctx, emailKey)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, v)
|
||||
|
||||
@ -132,7 +132,7 @@ func TestAuthorizationDB_Create(t *testing.T) {
|
||||
}
|
||||
assert.Len(t, expectedAuths, testCase.newCount)
|
||||
|
||||
v, err := authDB.DB.Get(emailKey)
|
||||
v, err := authDB.DB.Get(ctx, emailKey)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, v)
|
||||
|
||||
@ -162,7 +162,7 @@ func TestAuthorizationDB_Get(t *testing.T) {
|
||||
authsBytes, err := expectedAuths.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = authDB.DB.Put(storage.Key("user@example.com"), authsBytes)
|
||||
err = authDB.DB.Put(ctx, storage.Key("user@example.com"), authsBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
cases := []struct {
|
||||
|
@ -101,13 +101,13 @@ func (checker *Checker) Close() error {
|
||||
func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = checker.metainfo.Iterate("", checker.lastChecked, true, false,
|
||||
func(it storage.Iterator) error {
|
||||
err = checker.metainfo.Iterate(ctx, "", checker.lastChecked, true, false,
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
|
||||
defer func() {
|
||||
var nextItem storage.ListItem
|
||||
it.Next(&nextItem)
|
||||
it.Next(ctx, &nextItem)
|
||||
// start at the next item in the next call
|
||||
checker.lastChecked = nextItem.Key.String()
|
||||
// if we have finished iterating, send and reset durability stats
|
||||
@ -124,7 +124,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
|
||||
}
|
||||
}()
|
||||
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
pointer := &pb.Pointer{}
|
||||
|
||||
err = proto.Unmarshal(item.Value, pointer)
|
||||
|
@ -100,7 +100,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
|
||||
// put test pointer to db
|
||||
metainfo := planet.Satellites[0].Metainfo.Service
|
||||
err := metainfo.Put("fake-piece-id", pointer)
|
||||
err := metainfo.Put(ctx, "fake-piece-id", pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
@ -146,7 +146,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
}
|
||||
// put test pointer to db
|
||||
metainfo = planet.Satellites[0].Metainfo.Service
|
||||
err = metainfo.Put("fake-piece-id", pointer)
|
||||
err = metainfo.Put(ctx, "fake-piece-id", pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = checker.IdentifyInjuredSegments(ctx)
|
||||
@ -158,6 +158,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
}
|
||||
|
||||
func makePointer(t *testing.T, planet *testplanet.Planet, pieceID string, createLost bool) {
|
||||
ctx := context.TODO()
|
||||
numOfStorageNodes := len(planet.StorageNodes)
|
||||
pieces := make([]*pb.RemotePiece, 0, numOfStorageNodes)
|
||||
// use online nodes
|
||||
@ -192,7 +193,7 @@ func makePointer(t *testing.T, planet *testplanet.Planet, pieceID string, create
|
||||
}
|
||||
// put test pointer to db
|
||||
pointerdb := planet.Satellites[0].Metainfo.Service
|
||||
err := pointerdb.Put(pieceID, pointer)
|
||||
err := pointerdb.Put(ctx, pieceID, pointer)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -50,14 +50,14 @@ func TestDataRepair(t *testing.T) {
|
||||
|
||||
// get a remote segment from metainfo
|
||||
metainfo := satellite.Metainfo.Service
|
||||
listResponse, _, err := metainfo.List("", "", "", true, 0, 0)
|
||||
listResponse, _, err := metainfo.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = metainfo.Get(path)
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
assert.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
@ -122,7 +122,7 @@ func TestDataRepair(t *testing.T) {
|
||||
assert.Equal(t, newData, testData)
|
||||
|
||||
// updated pointer should not contain any of the killed nodes
|
||||
pointer, err = metainfo.Get(path)
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
assert.NoError(t, err)
|
||||
|
||||
remotePieces = pointer.GetRemote().GetRemotePieces()
|
||||
|
@ -80,7 +80,7 @@ func (r RevocationDB) Get(ctx context.Context, chain []*x509.Certificate) (_ *ex
|
||||
return nil, extensions.ErrRevocation.Wrap(err)
|
||||
}
|
||||
|
||||
revBytes, err := r.DB.Get(nodeID.Bytes())
|
||||
revBytes, err := r.DB.Get(ctx, nodeID.Bytes())
|
||||
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, extensions.ErrRevocationDB.Wrap(err)
|
||||
}
|
||||
@ -124,7 +124,7 @@ func (r RevocationDB) Put(ctx context.Context, chain []*x509.Certificate, revExt
|
||||
if err != nil {
|
||||
return extensions.ErrRevocationDB.Wrap(err)
|
||||
}
|
||||
if err := r.DB.Put(nodeID.Bytes(), revExt.Value); err != nil {
|
||||
if err := r.DB.Put(ctx, nodeID.Bytes(), revExt.Value); err != nil {
|
||||
return extensions.ErrRevocationDB.Wrap(err)
|
||||
}
|
||||
return nil
|
||||
@ -133,12 +133,12 @@ func (r RevocationDB) Put(ctx context.Context, chain []*x509.Certificate, revExt
|
||||
// List lists all revocations in the store
|
||||
func (r RevocationDB) List(ctx context.Context) (revs []*extensions.Revocation, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
keys, err := r.DB.List([]byte{}, 0)
|
||||
keys, err := r.DB.List(ctx, []byte{}, 0)
|
||||
if err != nil {
|
||||
return nil, extensions.ErrRevocationDB.Wrap(err)
|
||||
}
|
||||
|
||||
marshaledRevs, err := r.DB.GetAll(keys)
|
||||
marshaledRevs, err := r.DB.GetAll(ctx, keys)
|
||||
if err != nil {
|
||||
return nil, extensions.ErrRevocationDB.Wrap(err)
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ func TestRevocationDB_Get(t *testing.T) {
|
||||
nodeID, err := identity.NodeIDFromCert(chain[peertls.CAIndex])
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Put(nodeID.Bytes(), ext.Value)
|
||||
err = db.Put(ctx, nodeID.Bytes(), ext.Value)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -102,7 +102,7 @@ func TestRevocationDB_Put_success(t *testing.T) {
|
||||
nodeID, err := identity.NodeIDFromCert(chain[peertls.CAIndex])
|
||||
require.NoError(t, err)
|
||||
|
||||
revBytes, err := db.Get(nodeID.Bytes())
|
||||
revBytes, err := db.Get(ctx, nodeID.Bytes())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, testcase.ext.Value, []byte(revBytes))
|
||||
|
@ -309,7 +309,7 @@ func (k *Kademlia) lookup(ctx context.Context, nodeID storj.NodeID, isBootstrap
|
||||
if err != nil {
|
||||
return pb.Node{}, err
|
||||
}
|
||||
bucket, err := k.routingTable.getKBucketID(nodeID)
|
||||
bucket, err := k.routingTable.getKBucketID(ctx, nodeID)
|
||||
if err != nil {
|
||||
k.log.Warn("Error getting getKBucketID in kad lookup")
|
||||
} else {
|
||||
|
@ -145,7 +145,7 @@ func TestBootstrap(t *testing.T) {
|
||||
err = n2.Bootstrap(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeIDs, err := n2.routingTable.nodeBucketDB.List(nil, 0)
|
||||
nodeIDs, err := n2.routingTable.nodeBucketDB.List(ctx, nil, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, nodeIDs, 3)
|
||||
}
|
||||
|
@ -134,7 +134,9 @@ func (rt *RoutingTable) CacheSize() int {
|
||||
// GetNodes retrieves nodes within the same kbucket as the given node id
|
||||
// Note: id doesn't need to be stored at time of search
|
||||
func (rt *RoutingTable) GetNodes(id storj.NodeID) ([]*pb.Node, bool) {
|
||||
bID, err := rt.getKBucketID(id)
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
bID, err := rt.getKBucketID(ctx, id)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
@ -149,8 +151,11 @@ func (rt *RoutingTable) GetNodes(id storj.NodeID) ([]*pb.Node, bool) {
|
||||
}
|
||||
|
||||
// GetBucketIds returns a storage.Keys type of bucket ID's in the Kademlia instance
|
||||
func (rt *RoutingTable) GetBucketIds() (storage.Keys, error) {
|
||||
kbuckets, err := rt.kadBucketDB.List(nil, 0)
|
||||
func (rt *RoutingTable) GetBucketIds() (_ storage.Keys, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
kbuckets, err := rt.kadBucketDB.List(ctx, nil, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -158,11 +163,14 @@ func (rt *RoutingTable) GetBucketIds() (storage.Keys, error) {
|
||||
}
|
||||
|
||||
// DumpNodes iterates through all nodes in the nodeBucketDB and marshals them to &pb.Nodes, then returns them
|
||||
func (rt *RoutingTable) DumpNodes() ([]*pb.Node, error) {
|
||||
func (rt *RoutingTable) DumpNodes() (_ []*pb.Node, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var nodes []*pb.Node
|
||||
var nodeErrors errs.Group
|
||||
|
||||
err := rt.iterateNodes(storj.NodeID{}, func(newID storj.NodeID, protoNode []byte) error {
|
||||
err = rt.iterateNodes(ctx, storj.NodeID{}, func(ctx context.Context, newID storj.NodeID, protoNode []byte) error {
|
||||
newNode := pb.Node{}
|
||||
err := proto.Unmarshal(protoNode, &newNode)
|
||||
if err != nil {
|
||||
@ -185,7 +193,7 @@ func (rt *RoutingTable) FindNear(target storj.NodeID, limit int) (_ []*pb.Node,
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
closestNodes := make([]*pb.Node, 0, limit+1)
|
||||
err = rt.iterateNodes(storj.NodeID{}, func(newID storj.NodeID, protoNode []byte) error {
|
||||
err = rt.iterateNodes(ctx, storj.NodeID{}, func(ctx context.Context, newID storj.NodeID, protoNode []byte) error {
|
||||
newPos := len(closestNodes)
|
||||
for ; newPos > 0 && compareByXor(closestNodes[newPos-1].Id, newID, target) > 0; newPos-- {
|
||||
}
|
||||
@ -222,7 +230,7 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) (err error) {
|
||||
rt.mutex.Lock()
|
||||
rt.seen[node.Id] = node
|
||||
rt.mutex.Unlock()
|
||||
v, err := rt.nodeBucketDB.Get(storage.Key(node.Id.Bytes()))
|
||||
v, err := rt.nodeBucketDB.Get(ctx, storage.Key(node.Id.Bytes()))
|
||||
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
||||
return RoutingErr.New("could not get node %s", err)
|
||||
}
|
||||
@ -254,10 +262,12 @@ func (rt *RoutingTable) ConnectionFailed(node *pb.Node) (err error) {
|
||||
}
|
||||
|
||||
// SetBucketTimestamp records the time of the last node lookup for a bucket
|
||||
func (rt *RoutingTable) SetBucketTimestamp(bIDBytes []byte, now time.Time) error {
|
||||
func (rt *RoutingTable) SetBucketTimestamp(bIDBytes []byte, now time.Time) (err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
rt.mutex.Lock()
|
||||
defer rt.mutex.Unlock()
|
||||
err := rt.createOrUpdateKBucket(keyToBucketID(bIDBytes), now)
|
||||
err = rt.createOrUpdateKBucket(ctx, keyToBucketID(bIDBytes), now)
|
||||
if err != nil {
|
||||
return NodeErr.New("could not update bucket timestamp %s", err)
|
||||
}
|
||||
@ -265,8 +275,11 @@ func (rt *RoutingTable) SetBucketTimestamp(bIDBytes []byte, now time.Time) error
|
||||
}
|
||||
|
||||
// GetBucketTimestamp retrieves time of the last node lookup for a bucket
|
||||
func (rt *RoutingTable) GetBucketTimestamp(bIDBytes []byte) (time.Time, error) {
|
||||
t, err := rt.kadBucketDB.Get(bIDBytes)
|
||||
func (rt *RoutingTable) GetBucketTimestamp(bIDBytes []byte) (_ time.Time, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
t, err := rt.kadBucketDB.Get(ctx, bIDBytes)
|
||||
if err != nil {
|
||||
return time.Now(), RoutingErr.New("could not get bucket timestamp %s", err)
|
||||
}
|
||||
@ -274,11 +287,12 @@ func (rt *RoutingTable) GetBucketTimestamp(bIDBytes []byte) (time.Time, error) {
|
||||
return time.Unix(0, timestamp).UTC(), nil
|
||||
}
|
||||
|
||||
func (rt *RoutingTable) iterateNodes(start storj.NodeID, f func(storj.NodeID, []byte) error, skipSelf bool) error {
|
||||
return rt.nodeBucketDB.Iterate(storage.IterateOptions{First: storage.Key(start.Bytes()), Recurse: true},
|
||||
func(it storage.Iterator) error {
|
||||
func (rt *RoutingTable) iterateNodes(ctx context.Context, start storj.NodeID, f func(context.Context, storj.NodeID, []byte) error, skipSelf bool) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return rt.nodeBucketDB.Iterate(ctx, storage.IterateOptions{First: storage.Key(start.Bytes()), Recurse: true},
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
nodeID, err := storj.NodeIDFromBytes(item.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -286,7 +300,7 @@ func (rt *RoutingTable) iterateNodes(start storj.NodeID, f func(storj.NodeID, []
|
||||
if skipSelf && nodeID == rt.self.Id {
|
||||
continue
|
||||
}
|
||||
err = f(nodeID, item.Value)
|
||||
err = f(ctx, nodeID, item.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ func (rt *RoutingTable) addNode(node *pb.Node) (_ bool, err error) {
|
||||
defer rt.mutex.Unlock()
|
||||
|
||||
if node.Id == rt.self.Id {
|
||||
err := rt.createOrUpdateKBucket(firstBucketID, time.Now())
|
||||
err := rt.createOrUpdateKBucket(ctx, firstBucketID, time.Now())
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not create initial K bucket: %s", err)
|
||||
}
|
||||
@ -35,7 +35,7 @@ func (rt *RoutingTable) addNode(node *pb.Node) (_ bool, err error) {
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
kadBucketID, err := rt.getKBucketID(node.Id)
|
||||
kadBucketID, err := rt.getKBucketID(ctx, node.Id)
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not getKBucketID: %s", err)
|
||||
}
|
||||
@ -59,11 +59,11 @@ func (rt *RoutingTable) addNode(node *pb.Node) (_ bool, err error) {
|
||||
return false, RoutingErr.New("could not determine leaf depth: %s", err)
|
||||
}
|
||||
kadBucketID = rt.splitBucket(kadBucketID, depth)
|
||||
err = rt.createOrUpdateKBucket(kadBucketID, time.Now())
|
||||
err = rt.createOrUpdateKBucket(ctx, kadBucketID, time.Now())
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not split and create K bucket: %s", err)
|
||||
}
|
||||
kadBucketID, err = rt.getKBucketID(node.Id)
|
||||
kadBucketID, err = rt.getKBucketID(ctx, node.Id)
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not get k bucket Id within add node split bucket checks: %s", err)
|
||||
}
|
||||
@ -85,7 +85,7 @@ func (rt *RoutingTable) addNode(node *pb.Node) (_ bool, err error) {
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not add node to nodeBucketDB: %s", err)
|
||||
}
|
||||
err = rt.createOrUpdateKBucket(kadBucketID, time.Now())
|
||||
err = rt.createOrUpdateKBucket(ctx, kadBucketID, time.Now())
|
||||
if err != nil {
|
||||
return false, RoutingErr.New("could not create or update K bucket: %s", err)
|
||||
}
|
||||
@ -109,13 +109,13 @@ func (rt *RoutingTable) removeNode(node *pb.Node) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
rt.mutex.Lock()
|
||||
defer rt.mutex.Unlock()
|
||||
kadBucketID, err := rt.getKBucketID(node.Id)
|
||||
kadBucketID, err := rt.getKBucketID(ctx, node.Id)
|
||||
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not get k bucket %s", err)
|
||||
}
|
||||
|
||||
existingMarshalled, err := rt.nodeBucketDB.Get(node.Id.Bytes())
|
||||
existingMarshalled, err := rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
//check replacement cache
|
||||
rt.removeFromReplacementCache(kadBucketID, node)
|
||||
@ -134,7 +134,7 @@ func (rt *RoutingTable) removeNode(node *pb.Node) (err error) {
|
||||
// don't remove a node if the address is different
|
||||
return nil
|
||||
}
|
||||
err = rt.nodeBucketDB.Delete(node.Id.Bytes())
|
||||
err = rt.nodeBucketDB.Delete(ctx, node.Id.Bytes())
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not delete node %s", err)
|
||||
}
|
||||
@ -160,7 +160,7 @@ func (rt *RoutingTable) putNode(node *pb.Node) (err error) {
|
||||
return RoutingErr.Wrap(err)
|
||||
}
|
||||
|
||||
err = rt.nodeBucketDB.Put(node.Id.Bytes(), v)
|
||||
err = rt.nodeBucketDB.Put(ctx, node.Id.Bytes(), v)
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not add key value pair to nodeBucketDB: %s", err)
|
||||
}
|
||||
@ -168,10 +168,11 @@ func (rt *RoutingTable) putNode(node *pb.Node) (err error) {
|
||||
}
|
||||
|
||||
// createOrUpdateKBucket: helper, adds or updates given kbucket
|
||||
func (rt *RoutingTable) createOrUpdateKBucket(bID bucketID, now time.Time) error {
|
||||
func (rt *RoutingTable) createOrUpdateKBucket(ctx context.Context, bID bucketID, now time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
dateTime := make([]byte, binary.MaxVarintLen64)
|
||||
binary.PutVarint(dateTime, now.UnixNano())
|
||||
err := rt.kadBucketDB.Put(bID[:], dateTime)
|
||||
err = rt.kadBucketDB.Put(ctx, bID[:], dateTime)
|
||||
if err != nil {
|
||||
return RoutingErr.New("could not add or update k bucket: %s", err)
|
||||
}
|
||||
@ -180,12 +181,13 @@ func (rt *RoutingTable) createOrUpdateKBucket(bID bucketID, now time.Time) error
|
||||
|
||||
// getKBucketID: helper, returns the id of the corresponding k bucket given a node id.
|
||||
// The node doesn't have to be in the routing table at time of search
|
||||
func (rt *RoutingTable) getKBucketID(nodeID storj.NodeID) (bucketID, error) {
|
||||
func (rt *RoutingTable) getKBucketID(ctx context.Context, nodeID storj.NodeID) (_ bucketID, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
match := bucketID{}
|
||||
err := rt.kadBucketDB.Iterate(storage.IterateOptions{First: storage.Key{}, Recurse: true},
|
||||
func(it storage.Iterator) error {
|
||||
err = rt.kadBucketDB.Iterate(ctx, storage.IterateOptions{First: storage.Key{}, Recurse: true},
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
match = keyToBucketID(item.Key)
|
||||
if nodeID.Less(match) {
|
||||
break
|
||||
@ -222,8 +224,10 @@ func (rt *RoutingTable) wouldBeInNearestK(nodeID storj.NodeID) (bool, error) {
|
||||
}
|
||||
|
||||
// kadBucketContainsLocalNode returns true if the kbucket in question contains the local node
|
||||
func (rt *RoutingTable) kadBucketContainsLocalNode(queryID bucketID) (bool, error) {
|
||||
bID, err := rt.getKBucketID(rt.self.Id)
|
||||
func (rt *RoutingTable) kadBucketContainsLocalNode(queryID bucketID) (_ bool, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
bID, err := rt.getKBucketID(ctx, rt.self.Id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -243,7 +247,9 @@ func (rt *RoutingTable) kadBucketHasRoom(bID bucketID) (bool, error) {
|
||||
}
|
||||
|
||||
// getNodeIDsWithinKBucket: helper, returns a collection of all the node ids contained within the kbucket
|
||||
func (rt *RoutingTable) getNodeIDsWithinKBucket(bID bucketID) (storj.NodeIDList, error) {
|
||||
func (rt *RoutingTable) getNodeIDsWithinKBucket(bID bucketID) (_ storj.NodeIDList, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
endpoints, err := rt.getKBucketRange(bID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -252,7 +258,7 @@ func (rt *RoutingTable) getNodeIDsWithinKBucket(bID bucketID) (storj.NodeIDList,
|
||||
right := endpoints[1]
|
||||
var ids []storj.NodeID
|
||||
|
||||
err = rt.iterateNodes(left, func(nodeID storj.NodeID, protoNode []byte) error {
|
||||
err = rt.iterateNodes(ctx, left, func(ctx context.Context, nodeID storj.NodeID, protoNode []byte) error {
|
||||
if left.Less(nodeID) && (nodeID.Less(right) || nodeID == right) {
|
||||
ids = append(ids, nodeID)
|
||||
}
|
||||
@ -265,10 +271,12 @@ func (rt *RoutingTable) getNodeIDsWithinKBucket(bID bucketID) (storj.NodeIDList,
|
||||
}
|
||||
|
||||
// getNodesFromIDsBytes: helper, returns array of encoded nodes from node ids
|
||||
func (rt *RoutingTable) getNodesFromIDsBytes(nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
|
||||
func (rt *RoutingTable) getNodesFromIDsBytes(nodeIDs storj.NodeIDList) (_ []*pb.Node, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var marshaledNodes []storage.Value
|
||||
for _, v := range nodeIDs {
|
||||
n, err := rt.nodeBucketDB.Get(v.Bytes())
|
||||
n, err := rt.nodeBucketDB.Get(ctx, v.Bytes())
|
||||
if err != nil {
|
||||
return nil, RoutingErr.New("could not get node id %v, %s", v, err)
|
||||
}
|
||||
@ -305,13 +313,15 @@ func (rt *RoutingTable) getUnmarshaledNodesFromBucket(bID bucketID) ([]*pb.Node,
|
||||
}
|
||||
|
||||
// getKBucketRange: helper, returns the left and right endpoints of the range of node ids contained within the bucket
|
||||
func (rt *RoutingTable) getKBucketRange(bID bucketID) ([]bucketID, error) {
|
||||
func (rt *RoutingTable) getKBucketRange(bID bucketID) (_ []bucketID, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
previousBucket := bucketID{}
|
||||
endpoints := []bucketID{}
|
||||
err := rt.kadBucketDB.Iterate(storage.IterateOptions{First: storage.Key{}, Recurse: true},
|
||||
func(it storage.Iterator) error {
|
||||
err = rt.kadBucketDB.Iterate(ctx, storage.IterateOptions{First: storage.Key{}, Recurse: true},
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
thisBucket := keyToBucketID(item.Key)
|
||||
if thisBucket == bID {
|
||||
endpoints = []bucketID{previousBucket, bID}
|
||||
|
@ -208,7 +208,7 @@ func TestAddNode(t *testing.T) {
|
||||
ok, err := rt.addNode(testCase.node)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testCase.added, ok)
|
||||
kadKeys, err := rt.kadBucketDB.List(nil, 0)
|
||||
kadKeys, err := rt.kadBucketDB.List(ctx, nil, 0)
|
||||
require.NoError(t, err)
|
||||
for i, v := range kadKeys {
|
||||
require.True(t, bytes.Equal(testCase.kadIDs[i], v[:2]))
|
||||
@ -239,7 +239,7 @@ func TestUpdateNode(t *testing.T) {
|
||||
ok, err := rt.addNode(node)
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, err)
|
||||
val, err := rt.nodeBucketDB.Get(node.Id.Bytes())
|
||||
val, err := rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
|
||||
assert.NoError(t, err)
|
||||
unmarshaled, err := unmarshalNodes([]storage.Value{val})
|
||||
assert.NoError(t, err)
|
||||
@ -249,7 +249,7 @@ func TestUpdateNode(t *testing.T) {
|
||||
node.Address = &pb.NodeAddress{Address: "BB"}
|
||||
err = rt.updateNode(node)
|
||||
assert.NoError(t, err)
|
||||
val, err = rt.nodeBucketDB.Get(node.Id.Bytes())
|
||||
val, err = rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
|
||||
assert.NoError(t, err)
|
||||
unmarshaled, err = unmarshalNodes([]storage.Value{val})
|
||||
assert.NoError(t, err)
|
||||
@ -267,17 +267,17 @@ func TestRemoveNode(t *testing.T) {
|
||||
ok, err := rt.addNode(node)
|
||||
assert.True(t, ok)
|
||||
assert.NoError(t, err)
|
||||
val, err := rt.nodeBucketDB.Get(node.Id.Bytes())
|
||||
val, err := rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, val)
|
||||
node2 := teststorj.MockNode("CC")
|
||||
rt.addToReplacementCache(kadBucketID, node2)
|
||||
err = rt.removeNode(node)
|
||||
assert.NoError(t, err)
|
||||
val, err = rt.nodeBucketDB.Get(node.Id.Bytes())
|
||||
val, err = rt.nodeBucketDB.Get(ctx, node.Id.Bytes())
|
||||
assert.Nil(t, val)
|
||||
assert.Error(t, err)
|
||||
val2, err := rt.nodeBucketDB.Get(node2.Id.Bytes())
|
||||
val2, err := rt.nodeBucketDB.Get(ctx, node2.Id.Bytes())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, val2)
|
||||
assert.Equal(t, 0, len(rt.replacementCache[kadBucketID]))
|
||||
@ -296,9 +296,9 @@ func TestCreateOrUpdateKBucket(t *testing.T) {
|
||||
id := bucketID{255, 255}
|
||||
rt := createRoutingTable(teststorj.NodeIDFromString("AA"))
|
||||
defer ctx.Check(rt.Close)
|
||||
err := rt.createOrUpdateKBucket(id, time.Now())
|
||||
err := rt.createOrUpdateKBucket(ctx, id, time.Now())
|
||||
assert.NoError(t, err)
|
||||
val, e := rt.kadBucketDB.Get(id[:])
|
||||
val, e := rt.kadBucketDB.Get(ctx, id[:])
|
||||
assert.NotNil(t, val)
|
||||
assert.NoError(t, e)
|
||||
|
||||
@ -311,7 +311,7 @@ func TestGetKBucketID(t *testing.T) {
|
||||
nodeIDA := teststorj.NodeIDFromString("AA")
|
||||
rt := createRoutingTable(nodeIDA)
|
||||
defer ctx.Check(rt.Close)
|
||||
keyA, err := rt.getKBucketID(nodeIDA)
|
||||
keyA, err := rt.getKBucketID(ctx, nodeIDA)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, kadIDA[:2], keyA[:2])
|
||||
}
|
||||
@ -354,7 +354,7 @@ func TestWouldBeInNearestK(t *testing.T) {
|
||||
result, err := rt.wouldBeInNearestK(testCase.nodeID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, testCase.closest, result)
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(testCase.nodeID.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, testCase.nodeID.Bytes(), []byte("")))
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -370,7 +370,7 @@ func TestKadBucketContainsLocalNode(t *testing.T) {
|
||||
copy(kadIDB[:], kadIDA[:])
|
||||
kadIDB[0] = 127
|
||||
now := time.Now()
|
||||
err := rt.createOrUpdateKBucket(kadIDB, now)
|
||||
err := rt.createOrUpdateKBucket(ctx, kadIDB, now)
|
||||
assert.NoError(t, err)
|
||||
resultTrue, err := rt.kadBucketContainsLocalNode(kadIDA)
|
||||
assert.NoError(t, err)
|
||||
@ -395,11 +395,11 @@ func TestKadBucketHasRoom(t *testing.T) {
|
||||
resultA, err := rt.kadBucketHasRoom(kadIDA)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, resultA)
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(node2.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(node3.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(node4.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(node5.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(node6.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, node2.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, node3.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, node4.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, node5.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, node6.Bytes(), []byte("")))
|
||||
resultB, err := rt.kadBucketHasRoom(kadIDA)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, resultB)
|
||||
@ -416,13 +416,13 @@ func TestGetNodeIDsWithinKBucket(t *testing.T) {
|
||||
copy(kadIDB[:], kadIDA[:])
|
||||
kadIDB[0] = 127
|
||||
now := time.Now()
|
||||
assert.NoError(t, rt.createOrUpdateKBucket(kadIDB, now))
|
||||
assert.NoError(t, rt.createOrUpdateKBucket(ctx, kadIDB, now))
|
||||
|
||||
nodeIDB := storj.NodeID{111, 255} //[01101111, 1111111]
|
||||
nodeIDC := storj.NodeID{47, 255} //[00101111, 1111111]
|
||||
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeIDB.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeIDC.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeIDB.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeIDC.Bytes(), []byte("")))
|
||||
|
||||
cases := []struct {
|
||||
testID string
|
||||
@ -465,12 +465,12 @@ func TestGetNodesFromIDs(t *testing.T) {
|
||||
rt := createRoutingTable(nodeA.Id)
|
||||
defer ctx.Check(rt.Close)
|
||||
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeA.Id.Bytes(), a))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeB.Id.Bytes(), b))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeC.Id.Bytes(), c))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeA.Id.Bytes(), a))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeB.Id.Bytes(), b))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeC.Id.Bytes(), c))
|
||||
expected := []*pb.Node{nodeA, nodeB, nodeC}
|
||||
|
||||
nodeKeys, err := rt.nodeBucketDB.List(nil, 0)
|
||||
nodeKeys, err := rt.nodeBucketDB.List(ctx, nil, 0)
|
||||
assert.NoError(t, err)
|
||||
values, err := rt.getNodesFromIDsBytes(teststorj.NodeIDsFromBytes(nodeKeys.ByteSlices()...))
|
||||
assert.NoError(t, err)
|
||||
@ -494,10 +494,10 @@ func TestUnmarshalNodes(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
rt := createRoutingTable(nodeA.Id)
|
||||
defer ctx.Check(rt.Close)
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeA.Id.Bytes(), a))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeB.Id.Bytes(), b))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(nodeC.Id.Bytes(), c))
|
||||
nodeKeys, err := rt.nodeBucketDB.List(nil, 0)
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeA.Id.Bytes(), a))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeB.Id.Bytes(), b))
|
||||
assert.NoError(t, rt.nodeBucketDB.Put(ctx, nodeC.Id.Bytes(), c))
|
||||
nodeKeys, err := rt.nodeBucketDB.List(ctx, nil, 0)
|
||||
assert.NoError(t, err)
|
||||
nodes, err := rt.getNodesFromIDsBytes(teststorj.NodeIDsFromBytes(nodeKeys.ByteSlices()...))
|
||||
assert.NoError(t, err)
|
||||
@ -537,9 +537,9 @@ func TestGetKBucketRange(t *testing.T) {
|
||||
idA := storj.NodeID{255, 255}
|
||||
idB := storj.NodeID{127, 255}
|
||||
idC := storj.NodeID{63, 255}
|
||||
assert.NoError(t, rt.kadBucketDB.Put(idA.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.kadBucketDB.Put(idB.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.kadBucketDB.Put(idC.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.kadBucketDB.Put(ctx, idA.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.kadBucketDB.Put(ctx, idB.Bytes(), []byte("")))
|
||||
assert.NoError(t, rt.kadBucketDB.Put(ctx, idC.Bytes(), []byte("")))
|
||||
zeroBID := bucketID{}
|
||||
cases := []struct {
|
||||
testID string
|
||||
@ -596,7 +596,7 @@ func TestDetermineLeafDepth(t *testing.T) {
|
||||
id: idA,
|
||||
depth: 0,
|
||||
addNode: func() {
|
||||
e := rt.kadBucketDB.Put(idA.Bytes(), []byte(""))
|
||||
e := rt.kadBucketDB.Put(ctx, idA.Bytes(), []byte(""))
|
||||
assert.NoError(t, e)
|
||||
},
|
||||
},
|
||||
@ -604,7 +604,7 @@ func TestDetermineLeafDepth(t *testing.T) {
|
||||
id: idB,
|
||||
depth: 1,
|
||||
addNode: func() {
|
||||
e := rt.kadBucketDB.Put(idB.Bytes(), []byte(""))
|
||||
e := rt.kadBucketDB.Put(ctx, idB.Bytes(), []byte(""))
|
||||
assert.NoError(t, e)
|
||||
},
|
||||
},
|
||||
@ -612,7 +612,7 @@ func TestDetermineLeafDepth(t *testing.T) {
|
||||
id: idA,
|
||||
depth: 1,
|
||||
addNode: func() {
|
||||
e := rt.kadBucketDB.Put(idC.Bytes(), []byte(""))
|
||||
e := rt.kadBucketDB.Put(ctx, idC.Bytes(), []byte(""))
|
||||
assert.NoError(t, e)
|
||||
},
|
||||
},
|
||||
|
@ -171,7 +171,7 @@ func TestConnectionSuccess(t *testing.T) {
|
||||
t.Run(testCase.testID, func(t *testing.T) {
|
||||
err := rt.ConnectionSuccess(testCase.node)
|
||||
assert.NoError(t, err)
|
||||
v, err := rt.nodeBucketDB.Get(testCase.id.Bytes())
|
||||
v, err := rt.nodeBucketDB.Get(ctx, testCase.id.Bytes())
|
||||
assert.NoError(t, err)
|
||||
n, err := unmarshalNodes([]storage.Value{v})
|
||||
assert.NoError(t, err)
|
||||
@ -190,7 +190,7 @@ func TestConnectionFailed(t *testing.T) {
|
||||
defer ctx.Check(rt.Close)
|
||||
err := rt.ConnectionFailed(node)
|
||||
assert.NoError(t, err)
|
||||
v, err := rt.nodeBucketDB.Get(id.Bytes())
|
||||
v, err := rt.nodeBucketDB.Get(ctx, id.Bytes())
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, v)
|
||||
}
|
||||
@ -204,7 +204,7 @@ func TestSetBucketTimestamp(t *testing.T) {
|
||||
defer ctx.Check(rt.Close)
|
||||
now := time.Now().UTC()
|
||||
|
||||
err := rt.createOrUpdateKBucket(keyToBucketID(id.Bytes()), now)
|
||||
err := rt.createOrUpdateKBucket(ctx, keyToBucketID(id.Bytes()), now)
|
||||
assert.NoError(t, err)
|
||||
ti, err := rt.GetBucketTimestamp(id.Bytes())
|
||||
assert.Equal(t, now, ti)
|
||||
@ -225,7 +225,7 @@ func TestGetBucketTimestamp(t *testing.T) {
|
||||
rt := createRoutingTable(id)
|
||||
defer ctx.Check(rt.Close)
|
||||
now := time.Now().UTC()
|
||||
err := rt.createOrUpdateKBucket(keyToBucketID(id.Bytes()), now)
|
||||
err := rt.createOrUpdateKBucket(ctx, keyToBucketID(id.Bytes()), now)
|
||||
assert.NoError(t, err)
|
||||
ti, err := rt.GetBucketTimestamp(id.Bytes())
|
||||
assert.Equal(t, now, ti)
|
||||
|
@ -46,7 +46,7 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err erro
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Read the segment pointer from the metainfo
|
||||
pointer, err := repairer.metainfo.Get(path)
|
||||
pointer, err := repairer.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -182,7 +182,7 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err erro
|
||||
mon.FloatVal("healthy_ratio_after_repair").Observe(healthyRatioAfterRepair)
|
||||
|
||||
// Update the segment pointer in the metainfo
|
||||
return repairer.metainfo.Put(path, pointer)
|
||||
return repairer.metainfo.Put(ctx, path, pointer)
|
||||
}
|
||||
|
||||
// sliceToSet converts the given slice to a set
|
||||
|
@ -49,14 +49,14 @@ func TestSegmentStoreRepair(t *testing.T) {
|
||||
|
||||
// get a remote segment from metainfo
|
||||
metainfo := satellite.Metainfo.Service
|
||||
listResponse, _, err := metainfo.List("", "", "", true, 0, 0)
|
||||
listResponse, _, err := metainfo.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = metainfo.Get(path)
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
@ -121,7 +121,7 @@ func TestSegmentStoreRepair(t *testing.T) {
|
||||
assert.Equal(t, newData, testData)
|
||||
|
||||
// updated pointer should not contain any of the killed nodes
|
||||
pointer, err = metainfo.Get(path)
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
assert.NoError(t, err)
|
||||
|
||||
remotePieces = pointer.GetRemote().GetRemotePieces()
|
||||
|
@ -123,7 +123,7 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *pb.SegmentHealt
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
pointer, err := endpoint.metainfo.Get(path)
|
||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package inspector_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"strings"
|
||||
@ -38,10 +39,10 @@ func TestInspectorStats(t *testing.T) {
|
||||
healthEndpoint := planet.Satellites[0].Inspector.Endpoint
|
||||
|
||||
// Get path of random segment we just uploaded and check the health
|
||||
_ = planet.Satellites[0].Metainfo.Database.Iterate(storage.IterateOptions{Recurse: true},
|
||||
func(it storage.Iterator) error {
|
||||
_ = planet.Satellites[0].Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true},
|
||||
func(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for it.Next(&item) {
|
||||
for it.Next(ctx, &item) {
|
||||
if bytes.Contains(item.Key, []byte(fmt.Sprintf("%s/", bucket))) {
|
||||
break
|
||||
}
|
||||
|
@ -135,7 +135,7 @@ func (endpoint *Endpoint) SegmentInfo(ctx context.Context, req *pb.SegmentInfoRe
|
||||
}
|
||||
|
||||
// TODO refactor to use []byte directly
|
||||
pointer, err := endpoint.metainfo.Get(path)
|
||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, status.Errorf(codes.NotFound, err.Error())
|
||||
@ -269,7 +269,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
// that will be affected is our per-project bandwidth and storage limits.
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Put(path, req.Pointer)
|
||||
err = endpoint.metainfo.Put(ctx, path, req.Pointer)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
@ -283,7 +283,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
}
|
||||
}
|
||||
|
||||
pointer, err := endpoint.metainfo.Get(path)
|
||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
@ -329,7 +329,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
|
||||
}
|
||||
|
||||
// TODO refactor to use []byte directly
|
||||
pointer, err := endpoint.metainfo.Get(path)
|
||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, status.Errorf(codes.NotFound, err.Error())
|
||||
@ -385,7 +385,7 @@ func (endpoint *Endpoint) DeleteSegment(ctx context.Context, req *pb.SegmentDele
|
||||
}
|
||||
|
||||
// TODO refactor to use []byte directly
|
||||
pointer, err := endpoint.metainfo.Get(path)
|
||||
pointer, err := endpoint.metainfo.Get(ctx, path)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, status.Errorf(codes.NotFound, err.Error())
|
||||
@ -393,7 +393,7 @@ func (endpoint *Endpoint) DeleteSegment(ctx context.Context, req *pb.SegmentDele
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.metainfo.Delete(path)
|
||||
err = endpoint.metainfo.Delete(ctx, path)
|
||||
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
@ -443,7 +443,7 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.ListSegments
|
||||
return nil, status.Errorf(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
items, more, err := endpoint.metainfo.List(prefix, string(req.StartAfter), string(req.EndBefore), req.Recursive, req.Limit, req.MetaFlags)
|
||||
items, more, err := endpoint.metainfo.List(ctx, prefix, string(req.StartAfter), string(req.EndBefore), req.Recursive, req.Limit, req.MetaFlags)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "ListV2: %v", err)
|
||||
}
|
||||
|
@ -28,8 +28,7 @@ func NewService(logger *zap.Logger, db storage.KeyValueStore) *Service {
|
||||
}
|
||||
|
||||
// Put puts pointer to db under specific path
|
||||
func (s *Service) Put(path string, pointer *pb.Pointer) (err error) {
|
||||
ctx := context.TODO()
|
||||
func (s *Service) Put(ctx context.Context, path string, pointer *pb.Pointer) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Update the pointer with the creation date
|
||||
@ -43,7 +42,7 @@ func (s *Service) Put(path string, pointer *pb.Pointer) (err error) {
|
||||
// TODO(kaloyan): make sure that we know we are overwriting the pointer!
|
||||
// In such case we should delete the pieces of the old segment if it was
|
||||
// a remote one.
|
||||
if err = s.DB.Put([]byte(path), pointerBytes); err != nil {
|
||||
if err = s.DB.Put(ctx, []byte(path), pointerBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -51,10 +50,9 @@ func (s *Service) Put(path string, pointer *pb.Pointer) (err error) {
|
||||
}
|
||||
|
||||
// Get gets pointer from db
|
||||
func (s *Service) Get(path string) (pointer *pb.Pointer, err error) {
|
||||
ctx := context.TODO()
|
||||
func (s *Service) Get(ctx context.Context, path string) (pointer *pb.Pointer, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
pointerBytes, err := s.DB.Get([]byte(path))
|
||||
pointerBytes, err := s.DB.Get(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -69,9 +67,8 @@ func (s *Service) Get(path string) (pointer *pb.Pointer, err error) {
|
||||
}
|
||||
|
||||
// List returns all Path keys in the pointers bucket
|
||||
func (s *Service) List(prefix string, startAfter string, endBefore string, recursive bool, limit int32,
|
||||
func (s *Service) List(ctx context.Context, prefix string, startAfter string, endBefore string, recursive bool, limit int32,
|
||||
metaFlags uint32) (items []*pb.ListResponse_Item, more bool, err error) {
|
||||
ctx := context.TODO()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var prefixKey storage.Key
|
||||
@ -82,7 +79,7 @@ func (s *Service) List(prefix string, startAfter string, endBefore string, recur
|
||||
}
|
||||
}
|
||||
|
||||
rawItems, more, err := storage.ListV2(s.DB, storage.ListOptions{
|
||||
rawItems, more, err := storage.ListV2(ctx, s.DB, storage.ListOptions{
|
||||
Prefix: prefixKey,
|
||||
StartAfter: storage.Key(startAfter),
|
||||
EndBefore: storage.Key(endBefore),
|
||||
@ -152,15 +149,13 @@ func (s *Service) setMetadata(item *pb.ListResponse_Item, data []byte, metaFlags
|
||||
}
|
||||
|
||||
// Delete deletes from item from db
|
||||
func (s *Service) Delete(path string) (err error) {
|
||||
ctx := context.TODO()
|
||||
func (s *Service) Delete(ctx context.Context, path string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return s.DB.Delete([]byte(path))
|
||||
return s.DB.Delete(ctx, []byte(path))
|
||||
}
|
||||
|
||||
// Iterate iterates over items in db
|
||||
func (s *Service) Iterate(prefix string, first string, recurse bool, reverse bool, f func(it storage.Iterator) error) (err error) {
|
||||
ctx := context.TODO()
|
||||
func (s *Service) Iterate(ctx context.Context, prefix string, first string, recurse bool, reverse bool, f func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
opts := storage.IterateOptions{
|
||||
Prefix: storage.Key(prefix),
|
||||
@ -168,5 +163,5 @@ func (s *Service) Iterate(prefix string, first string, recurse bool, reverse boo
|
||||
Recurse: recurse,
|
||||
Reverse: reverse,
|
||||
}
|
||||
return s.DB.Iterate(opts, f)
|
||||
return s.DB.Iterate(ctx, opts, f)
|
||||
}
|
||||
|
@ -5,12 +5,13 @@ package boltdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/zeebo/errs"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -126,13 +127,14 @@ func (client *Client) view(fn func(*bolt.Bucket) error) error {
|
||||
// Ref: https://github.com/boltdb/bolt/blob/master/db.go#L160
|
||||
// Note: when using this method, check if it need to be executed asynchronously
|
||||
// since it blocks for the duration db.MaxBatchDelay.
|
||||
func (client *Client) Put(key storage.Key, value storage.Value) error {
|
||||
func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
start := time.Now()
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
|
||||
err := client.batch(func(bucket *bolt.Bucket) error {
|
||||
err = client.batch(func(bucket *bolt.Bucket) error {
|
||||
return bucket.Put(key, value)
|
||||
})
|
||||
mon.IntVal("boltdb_batch_time_elapsed").Observe(int64(time.Since(start)))
|
||||
@ -140,7 +142,8 @@ func (client *Client) Put(key storage.Key, value storage.Value) error {
|
||||
}
|
||||
|
||||
// PutAndCommit adds a key/value to BoltDB and writes it to disk.
|
||||
func (client *Client) PutAndCommit(key storage.Key, value storage.Value) error {
|
||||
func (client *Client) PutAndCommit(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -151,13 +154,14 @@ func (client *Client) PutAndCommit(key storage.Key, value storage.Value) error {
|
||||
}
|
||||
|
||||
// Get looks up the provided key from boltdb returning either an error or the result.
|
||||
func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return nil, storage.ErrEmptyKey.New("")
|
||||
}
|
||||
|
||||
var value storage.Value
|
||||
err := client.view(func(bucket *bolt.Bucket) error {
|
||||
err = client.view(func(bucket *bolt.Bucket) error {
|
||||
data := bucket.Get([]byte(key))
|
||||
if len(data) == 0 {
|
||||
return storage.ErrKeyNotFound.New(key.String())
|
||||
@ -169,7 +173,8 @@ func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
}
|
||||
|
||||
// Delete deletes a key/value pair from boltdb, for a given the key
|
||||
func (client *Client) Delete(key storage.Key) error {
|
||||
func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -180,13 +185,14 @@ func (client *Client) Delete(key storage.Key) error {
|
||||
}
|
||||
|
||||
// List returns either a list of keys for which boltdb has values or an error.
|
||||
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
rv, err := storage.ListKeys(client, first, limit)
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
rv, err := storage.ListKeys(ctx, client, first, limit)
|
||||
return rv, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// Close closes a BoltDB client
|
||||
func (client *Client) Close() error {
|
||||
func (client *Client) Close() (err error) {
|
||||
if atomic.AddInt32(client.referenceCount, -1) == 0 {
|
||||
return Error.Wrap(client.db.Close())
|
||||
}
|
||||
@ -195,13 +201,14 @@ func (client *Client) Close() error {
|
||||
|
||||
// GetAll finds all values for the provided keys (up to storage.LookupLimit).
|
||||
// If more keys are provided than the maximum, an error will be returned.
|
||||
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(keys) > storage.LookupLimit {
|
||||
return nil, storage.ErrLimitExceeded
|
||||
}
|
||||
|
||||
vals := make(storage.Values, 0, len(keys))
|
||||
err := client.view(func(bucket *bolt.Bucket) error {
|
||||
err = client.view(func(bucket *bolt.Bucket) error {
|
||||
for _, key := range keys {
|
||||
val := bucket.Get([]byte(key))
|
||||
if val == nil {
|
||||
@ -216,7 +223,8 @@ func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (client *Client) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
|
||||
func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.view(func(bucket *bolt.Bucket) error {
|
||||
var cursor advancer
|
||||
if !opts.Reverse {
|
||||
@ -229,7 +237,7 @@ func (client *Client) Iterate(opts storage.IterateOptions, fn func(storage.Itera
|
||||
lastPrefix := []byte{}
|
||||
wasPrefix := false
|
||||
|
||||
return fn(storage.IteratorFunc(func(item *storage.ListItem) bool {
|
||||
return fn(ctx, storage.IteratorFunc(func(ctx context.Context, item *storage.ListItem) bool {
|
||||
var key, value []byte
|
||||
if start {
|
||||
key, value = cursor.PositionToFirst(opts.Prefix, opts.First)
|
||||
|
@ -4,6 +4,7 @@
|
||||
package boltdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -17,6 +18,8 @@ import (
|
||||
"storj.io/storj/storage/testsuite"
|
||||
)
|
||||
|
||||
var ctx = context.Background() // test context
|
||||
|
||||
func TestSuite(t *testing.T) {
|
||||
tempdir, err := ioutil.TempDir("", "storj-bolt")
|
||||
if err != nil {
|
||||
@ -96,8 +99,8 @@ func (store *boltLongBenchmarkStore) BulkImport(iter storage.Iterator) (err erro
|
||||
defer func() { store.db.NoSync = oldval }()
|
||||
|
||||
var item storage.ListItem
|
||||
for iter.Next(&item) {
|
||||
if err := store.Put(item.Key, item.Value); err != nil {
|
||||
for iter.Next(ctx, &item) {
|
||||
if err := store.Put(ctx, item.Key, item.Value); err != nil {
|
||||
return fmt.Errorf("Failed to insert data (%q, %q): %v", item.Key, item.Value, err)
|
||||
}
|
||||
}
|
||||
@ -172,7 +175,7 @@ func BenchmarkClientWrite(b *testing.B) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := kdb.PutAndCommit(key, value)
|
||||
err := kdb.PutAndCommit(ctx, key, value)
|
||||
if err != nil {
|
||||
b.Fatal("Put err:", err)
|
||||
}
|
||||
@ -214,7 +217,7 @@ func BenchmarkClientNoSyncWrite(b *testing.B) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := kdb.PutAndCommit(key, value)
|
||||
err := kdb.PutAndCommit(ctx, key, value)
|
||||
if err != nil {
|
||||
b.Fatal("PutAndCommit Nosync err:", err)
|
||||
}
|
||||
@ -262,7 +265,7 @@ func BenchmarkClientBatchWrite(b *testing.B) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := kdb.Put(key, value)
|
||||
err := kdb.Put(ctx, key, value)
|
||||
if err != nil {
|
||||
b.Fatalf("boltDB put: %v\n", err)
|
||||
}
|
||||
@ -310,7 +313,7 @@ func BenchmarkClientBatchNoSyncWrite(b *testing.B) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := kdb.Put(key, value)
|
||||
err := kdb.Put(ctx, key, value)
|
||||
if err != nil {
|
||||
b.Fatalf("boltDB put: %v\n", err)
|
||||
}
|
||||
|
@ -5,11 +5,15 @@ package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Delimiter separates nested paths in storage
|
||||
const Delimiter = '/'
|
||||
|
||||
@ -53,17 +57,17 @@ type ListItem struct {
|
||||
// KeyValueStore describes key/value stores like redis and boltdb
|
||||
type KeyValueStore interface {
|
||||
// Put adds a value to store
|
||||
Put(Key, Value) error
|
||||
Put(context.Context, Key, Value) error
|
||||
// Get gets a value to store
|
||||
Get(Key) (Value, error)
|
||||
Get(context.Context, Key) (Value, error)
|
||||
// GetAll gets all values from the store
|
||||
GetAll(Keys) (Values, error)
|
||||
GetAll(context.Context, Keys) (Values, error)
|
||||
// Delete deletes key and the value
|
||||
Delete(Key) error
|
||||
Delete(context.Context, Key) error
|
||||
// List lists all keys starting from start and upto limit items
|
||||
List(start Key, limit int) (Keys, error)
|
||||
List(ctx context.Context, start Key, limit int) (Keys, error)
|
||||
// Iterate iterates over items based on opts
|
||||
Iterate(opts IterateOptions, fn func(Iterator) error) error
|
||||
Iterate(ctx context.Context, opts IterateOptions, fn func(context.Context, Iterator) error) error
|
||||
// Close closes the store
|
||||
Close() error
|
||||
}
|
||||
@ -84,7 +88,7 @@ type IterateOptions struct {
|
||||
type Iterator interface {
|
||||
// Next prepares the next list item.
|
||||
// It returns true on success, or false if there is no next result row or an error happened while preparing it.
|
||||
Next(item *ListItem) bool
|
||||
Next(ctx context.Context, item *ListItem) bool
|
||||
}
|
||||
|
||||
// IsZero returns true if the value struct is it's zero value
|
||||
|
@ -5,14 +5,15 @@ package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// IteratorFunc implements basic iterator
|
||||
type IteratorFunc func(item *ListItem) bool
|
||||
type IteratorFunc func(ctx context.Context, item *ListItem) bool
|
||||
|
||||
// Next returns the next item
|
||||
func (next IteratorFunc) Next(item *ListItem) bool { return next(item) }
|
||||
func (next IteratorFunc) Next(ctx context.Context, item *ListItem) bool { return next(ctx, item) }
|
||||
|
||||
// SelectPrefixed keeps only items that have prefix
|
||||
// items will be reused and modified
|
||||
@ -86,7 +87,7 @@ type StaticIterator struct {
|
||||
}
|
||||
|
||||
// Next returns the next item from the iterator
|
||||
func (it *StaticIterator) Next(item *ListItem) bool {
|
||||
func (it *StaticIterator) Next(ctx context.Context, item *ListItem) bool {
|
||||
if it.Index >= len(it.Items) {
|
||||
return false
|
||||
}
|
||||
|
@ -3,20 +3,25 @@
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// ListKeys returns keys starting from first and upto limit
|
||||
// limit is capped to LookupLimit
|
||||
func ListKeys(store KeyValueStore, first Key, limit int) (Keys, error) {
|
||||
func ListKeys(ctx context.Context, store KeyValueStore, first Key, limit int) (_ Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if limit <= 0 || limit > LookupLimit {
|
||||
limit = LookupLimit
|
||||
}
|
||||
|
||||
keys := make(Keys, 0, limit)
|
||||
err := store.Iterate(IterateOptions{
|
||||
err = store.Iterate(ctx, IterateOptions{
|
||||
First: first,
|
||||
Recurse: true,
|
||||
}, func(it Iterator) error {
|
||||
}, func(ctx context.Context, it Iterator) error {
|
||||
var item ListItem
|
||||
for ; limit > 0 && it.Next(&item); limit-- {
|
||||
for ; limit > 0 && it.Next(ctx, &item); limit-- {
|
||||
if item.Key == nil {
|
||||
panic("nil key")
|
||||
}
|
||||
@ -30,19 +35,20 @@ func ListKeys(store KeyValueStore, first Key, limit int) (Keys, error) {
|
||||
|
||||
// ReverseListKeys returns keys starting from first and upto limit in reverse order
|
||||
// limit is capped to LookupLimit
|
||||
func ReverseListKeys(store KeyValueStore, first Key, limit int) (Keys, error) {
|
||||
func ReverseListKeys(ctx context.Context, store KeyValueStore, first Key, limit int) (_ Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if limit <= 0 || limit > LookupLimit {
|
||||
limit = LookupLimit
|
||||
}
|
||||
|
||||
keys := make(Keys, 0, limit)
|
||||
err := store.Iterate(IterateOptions{
|
||||
err = store.Iterate(ctx, IterateOptions{
|
||||
First: first,
|
||||
Recurse: true,
|
||||
Reverse: true,
|
||||
}, func(it Iterator) error {
|
||||
}, func(ctx context.Context, it Iterator) error {
|
||||
var item ListItem
|
||||
for ; limit > 0 && it.Next(&item); limit-- {
|
||||
for ; limit > 0 && it.Next(ctx, &item); limit-- {
|
||||
if item.Key == nil {
|
||||
panic("nil key")
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
@ -24,7 +25,8 @@ type ListOptions struct {
|
||||
// then the result []ListItem includes all requested keys.
|
||||
// If true then the caller must call List again to get more
|
||||
// results by setting `StartAfter` or `EndBefore` appropriately.
|
||||
func ListV2(store KeyValueStore, opts ListOptions) (result Items, more bool, err error) {
|
||||
func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result Items, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if !opts.StartAfter.IsZero() && !opts.EndBefore.IsZero() {
|
||||
return nil, false, errors.New("start-after and end-before cannot be combined")
|
||||
}
|
||||
@ -44,11 +46,11 @@ func ListV2(store KeyValueStore, opts ListOptions) (result Items, more bool, err
|
||||
first = opts.EndBefore
|
||||
}
|
||||
|
||||
iterate := func(it Iterator) error {
|
||||
iterate := func(ctx context.Context, it Iterator) error {
|
||||
var item ListItem
|
||||
skipFirst := true
|
||||
for ; limit > 0; limit-- {
|
||||
if !it.Next(&item) {
|
||||
if !it.Next(ctx, &item) {
|
||||
more = false
|
||||
return nil
|
||||
}
|
||||
@ -79,7 +81,7 @@ func ListV2(store KeyValueStore, opts ListOptions) (result Items, more bool, err
|
||||
}
|
||||
|
||||
// we still need to consume one item for the more flag
|
||||
more = it.Next(&item)
|
||||
more = it.Next(ctx, &item)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -90,7 +92,7 @@ func ListV2(store KeyValueStore, opts ListOptions) (result Items, more bool, err
|
||||
if reverse && !opts.EndBefore.IsZero() {
|
||||
firstFull = joinKey(opts.Prefix, opts.EndBefore)
|
||||
}
|
||||
err = store.Iterate(IterateOptions{
|
||||
err = store.Iterate(ctx, IterateOptions{
|
||||
Prefix: opts.Prefix,
|
||||
First: firstFull,
|
||||
Reverse: reverse,
|
||||
|
@ -5,6 +5,7 @@
|
||||
package postgreskv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -110,9 +111,10 @@ type alternateOrderedPostgresIterator struct {
|
||||
*orderedPostgresIterator
|
||||
}
|
||||
|
||||
func (opi *alternateOrderedPostgresIterator) doNextQuery() (*sql.Rows, error) {
|
||||
func (opi *alternateOrderedPostgresIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if opi.opts.Recurse {
|
||||
return opi.orderedPostgresIterator.doNextQuery()
|
||||
return opi.orderedPostgresIterator.doNextQuery(ctx)
|
||||
}
|
||||
start := opi.lastKeySeen
|
||||
if start == nil {
|
||||
@ -127,7 +129,8 @@ func (opi *alternateOrderedPostgresIterator) doNextQuery() (*sql.Rows, error) {
|
||||
return opi.client.pgConn.Query(query, []byte(opi.bucket), []byte(opi.opts.Prefix), []byte(start), opi.batchSize+1)
|
||||
}
|
||||
|
||||
func newAlternateOrderedPostgresIterator(altClient *AlternateClient, opts storage.IterateOptions, batchSize int) (*alternateOrderedPostgresIterator, error) {
|
||||
func newAlternateOrderedPostgresIterator(ctx context.Context, altClient *AlternateClient, opts storage.IterateOptions, batchSize int) (_ *alternateOrderedPostgresIterator, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if opts.Prefix == nil {
|
||||
opts.Prefix = storage.Key("")
|
||||
}
|
||||
@ -144,7 +147,7 @@ func newAlternateOrderedPostgresIterator(altClient *AlternateClient, opts storag
|
||||
}
|
||||
opi := &alternateOrderedPostgresIterator{orderedPostgresIterator: opi1}
|
||||
opi.nextQuery = opi.doNextQuery
|
||||
newRows, err := opi.nextQuery()
|
||||
newRows, err := opi.nextQuery(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -153,8 +156,9 @@ func newAlternateOrderedPostgresIterator(altClient *AlternateClient, opts storag
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (altClient *AlternateClient) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) (err error) {
|
||||
opi, err := newAlternateOrderedPostgresIterator(altClient, opts, defaultBatchSize)
|
||||
func (altClient *AlternateClient) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)
|
||||
opi, err := newAlternateOrderedPostgresIterator(ctx, altClient, opts, defaultBatchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -162,5 +166,5 @@ func (altClient *AlternateClient) Iterate(opts storage.IterateOptions, fn func(s
|
||||
err = errs.Combine(err, opi.Close())
|
||||
}()
|
||||
|
||||
return fn(opi)
|
||||
return fn(ctx, opi)
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package postgreskv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
@ -51,12 +52,14 @@ func New(dbURL string) (*Client, error) {
|
||||
}
|
||||
|
||||
// Put sets the value for the provided key.
|
||||
func (client *Client) Put(key storage.Key, value storage.Value) error {
|
||||
return client.PutPath(storage.Key(defaultBucket), key, value)
|
||||
func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.PutPath(ctx, storage.Key(defaultBucket), key, value)
|
||||
}
|
||||
|
||||
// PutPath sets the value for the provided key (in the given bucket).
|
||||
func (client *Client) PutPath(bucket, key storage.Key, value storage.Value) error {
|
||||
func (client *Client) PutPath(ctx context.Context, bucket, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -65,17 +68,19 @@ func (client *Client) PutPath(bucket, key storage.Key, value storage.Value) erro
|
||||
VALUES ($1::BYTEA, $2::BYTEA, $3::BYTEA)
|
||||
ON CONFLICT (bucket, fullpath) DO UPDATE SET metadata = EXCLUDED.metadata
|
||||
`
|
||||
_, err := client.pgConn.Exec(q, []byte(bucket), []byte(key), []byte(value))
|
||||
_, err = client.pgConn.Exec(q, []byte(bucket), []byte(key), []byte(value))
|
||||
return err
|
||||
}
|
||||
|
||||
// Get looks up the provided key and returns its value (or an error).
|
||||
func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
return client.GetPath(storage.Key(defaultBucket), key)
|
||||
func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.GetPath(ctx, storage.Key(defaultBucket), key)
|
||||
}
|
||||
|
||||
// GetPath looks up the provided key (in the given bucket) and returns its value (or an error).
|
||||
func (client *Client) GetPath(bucket, key storage.Key) (storage.Value, error) {
|
||||
func (client *Client) GetPath(ctx context.Context, bucket, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return nil, storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -83,7 +88,7 @@ func (client *Client) GetPath(bucket, key storage.Key) (storage.Value, error) {
|
||||
q := "SELECT metadata FROM pathdata WHERE bucket = $1::BYTEA AND fullpath = $2::BYTEA"
|
||||
row := client.pgConn.QueryRow(q, []byte(bucket), []byte(key))
|
||||
var val []byte
|
||||
err := row.Scan(&val)
|
||||
err = row.Scan(&val)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, storage.ErrKeyNotFound.New(key.String())
|
||||
}
|
||||
@ -94,12 +99,14 @@ func (client *Client) GetPath(bucket, key storage.Key) (storage.Value, error) {
|
||||
}
|
||||
|
||||
// Delete deletes the given key and its associated value.
|
||||
func (client *Client) Delete(key storage.Key) error {
|
||||
return client.DeletePath(storage.Key(defaultBucket), key)
|
||||
func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.DeletePath(ctx, storage.Key(defaultBucket), key)
|
||||
}
|
||||
|
||||
// DeletePath deletes the given key (in the given bucket) and its associated value.
|
||||
func (client *Client) DeletePath(bucket, key storage.Key) error {
|
||||
func (client *Client) DeletePath(ctx context.Context, bucket, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -120,8 +127,9 @@ func (client *Client) DeletePath(bucket, key storage.Key) error {
|
||||
}
|
||||
|
||||
// List returns either a list of known keys, in order, or an error.
|
||||
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
return storage.ListKeys(client, first, limit)
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return storage.ListKeys(ctx, client, first, limit)
|
||||
}
|
||||
|
||||
// Close closes the client
|
||||
@ -131,14 +139,16 @@ func (client *Client) Close() error {
|
||||
|
||||
// GetAll finds all values for the provided keys (up to storage.LookupLimit).
|
||||
// If more keys are provided than the maximum, an error will be returned.
|
||||
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
return client.GetAllPath(storage.Key(defaultBucket), keys)
|
||||
func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.GetAllPath(ctx, storage.Key(defaultBucket), keys)
|
||||
}
|
||||
|
||||
// GetAllPath finds all values for the provided keys (up to storage.LookupLimit)
|
||||
// in the given bucket. if more keys are provided than the maximum, an error
|
||||
// will be returned.
|
||||
func (client *Client) GetAllPath(bucket storage.Key, keys storage.Keys) (storage.Values, error) {
|
||||
func (client *Client) GetAllPath(ctx context.Context, bucket storage.Key, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(keys) > storage.LookupLimit {
|
||||
return nil, storage.ErrLimitExceeded
|
||||
}
|
||||
@ -176,11 +186,12 @@ type orderedPostgresIterator struct {
|
||||
curRows *sql.Rows
|
||||
lastKeySeen storage.Key
|
||||
errEncountered error
|
||||
nextQuery func() (*sql.Rows, error)
|
||||
nextQuery func(context.Context) (*sql.Rows, error)
|
||||
}
|
||||
|
||||
// Next fills in info for the next item in an ongoing listing.
|
||||
func (opi *orderedPostgresIterator) Next(item *storage.ListItem) bool {
|
||||
func (opi *orderedPostgresIterator) Next(ctx context.Context, item *storage.ListItem) bool {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
if !opi.curRows.Next() {
|
||||
if err := opi.curRows.Close(); err != nil {
|
||||
opi.errEncountered = errs.Wrap(err)
|
||||
@ -193,7 +204,7 @@ func (opi *orderedPostgresIterator) Next(item *storage.ListItem) bool {
|
||||
opi.errEncountered = errs.Wrap(err)
|
||||
return false
|
||||
}
|
||||
newRows, err := opi.nextQuery()
|
||||
newRows, err := opi.nextQuery(ctx)
|
||||
if err != nil {
|
||||
opi.errEncountered = errs.Wrap(err)
|
||||
return false
|
||||
@ -217,7 +228,7 @@ func (opi *orderedPostgresIterator) Next(item *storage.ListItem) bool {
|
||||
item.Value = storage.Value(v)
|
||||
opi.curIndex++
|
||||
if opi.curIndex == 1 && opi.lastKeySeen.Equal(item.Key) {
|
||||
return opi.Next(item)
|
||||
return opi.Next(ctx, item)
|
||||
}
|
||||
if !opi.opts.Recurse && item.Key[len(item.Key)-1] == opi.delimiter && !item.Key.Equal(opi.opts.Prefix) {
|
||||
item.IsPrefix = true
|
||||
@ -230,7 +241,8 @@ func (opi *orderedPostgresIterator) Next(item *storage.ListItem) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (opi *orderedPostgresIterator) doNextQuery() (*sql.Rows, error) {
|
||||
func (opi *orderedPostgresIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
start := opi.lastKeySeen
|
||||
if start == nil {
|
||||
start = opi.opts.First
|
||||
@ -267,7 +279,8 @@ func (opi *orderedPostgresIterator) Close() error {
|
||||
return errs.Combine(opi.errEncountered, opi.curRows.Close())
|
||||
}
|
||||
|
||||
func newOrderedPostgresIterator(pgClient *Client, opts storage.IterateOptions, batchSize int) (*orderedPostgresIterator, error) {
|
||||
func newOrderedPostgresIterator(ctx context.Context, pgClient *Client, opts storage.IterateOptions, batchSize int) (_ *orderedPostgresIterator, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if opts.Prefix == nil {
|
||||
opts.Prefix = storage.Key("")
|
||||
}
|
||||
@ -283,7 +296,7 @@ func newOrderedPostgresIterator(pgClient *Client, opts storage.IterateOptions, b
|
||||
curIndex: 0,
|
||||
}
|
||||
opi.nextQuery = opi.doNextQuery
|
||||
newRows, err := opi.nextQuery()
|
||||
newRows, err := opi.nextQuery(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -292,8 +305,9 @@ func newOrderedPostgresIterator(pgClient *Client, opts storage.IterateOptions, b
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (client *Client) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) (err error) {
|
||||
opi, err := newOrderedPostgresIterator(client, opts, defaultBatchSize)
|
||||
func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
opi, err := newOrderedPostgresIterator(ctx, client, opts, defaultBatchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -301,5 +315,5 @@ func (client *Client) Iterate(opts storage.IterateOptions, fn func(storage.Itera
|
||||
err = errs.Combine(err, opi.Close())
|
||||
}()
|
||||
|
||||
return fn(opi)
|
||||
return fn(ctx, opi)
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package postgreskv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
@ -15,6 +16,8 @@ import (
|
||||
"storj.io/storj/storage/testsuite"
|
||||
)
|
||||
|
||||
var ctx = context.Background() // test context
|
||||
|
||||
func newTestPostgres(t testing.TB) (store *Client, cleanup func()) {
|
||||
if *pgtest.ConnStr == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", pgtest.DefaultConnStr)
|
||||
@ -73,7 +76,7 @@ func bulkImport(db *sql.DB, iter storage.Iterator) (err error) {
|
||||
}()
|
||||
|
||||
var item storage.ListItem
|
||||
for iter.Next(&item) {
|
||||
for iter.Next(ctx, &item) {
|
||||
if _, err := stmt.Exec([]byte(""), []byte(item.Key), []byte(item.Value)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -18,6 +20,8 @@ import (
|
||||
var (
|
||||
// Error is a redis error
|
||||
Error = errs.Class("redis error")
|
||||
|
||||
mon = monkit.Package()
|
||||
)
|
||||
|
||||
// TODO(coyle): this should be set to 61 * time.Minute after we implement Ping and Refresh on Overlay Cache
|
||||
@ -71,7 +75,8 @@ func NewClientFrom(address string) (*Client, error) {
|
||||
}
|
||||
|
||||
// Get looks up the provided key from redis returning either an error or the result.
|
||||
func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return nil, storage.ErrEmptyKey.New("")
|
||||
}
|
||||
@ -87,12 +92,13 @@ func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
}
|
||||
|
||||
// Put adds a value to the provided key in redis, returning an error on failure.
|
||||
func (client *Client) Put(key storage.Key, value storage.Value) error {
|
||||
func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
|
||||
err := client.db.Set(key.String(), []byte(value), client.TTL).Err()
|
||||
err = client.db.Set(key.String(), []byte(value), client.TTL).Err()
|
||||
if err != nil {
|
||||
return Error.New("put error: %v", err)
|
||||
}
|
||||
@ -100,17 +106,19 @@ func (client *Client) Put(key storage.Key, value storage.Value) error {
|
||||
}
|
||||
|
||||
// List returns either a list of keys for which boltdb has values or an error.
|
||||
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
return storage.ListKeys(client, first, limit)
|
||||
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return storage.ListKeys(ctx, client, first, limit)
|
||||
}
|
||||
|
||||
// Delete deletes a key/value pair from redis, for a given the key
|
||||
func (client *Client) Delete(key storage.Key) error {
|
||||
func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if key.IsZero() {
|
||||
return storage.ErrEmptyKey.New("")
|
||||
}
|
||||
|
||||
err := client.db.Del(key.String()).Err()
|
||||
err = client.db.Del(key.String()).Err()
|
||||
if err != nil {
|
||||
return Error.New("delete error: %v", err)
|
||||
}
|
||||
@ -125,7 +133,8 @@ func (client *Client) Close() error {
|
||||
// GetAll is the bulk method for gets from the redis data store.
|
||||
// The maximum keys returned will be storage.LookupLimit. If more than that
|
||||
// is requested, an error will be returned
|
||||
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(keys) > storage.LookupLimit {
|
||||
return nil, storage.ErrLimitExceeded
|
||||
}
|
||||
@ -156,9 +165,9 @@ func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (client *Client) Iterate(opts storage.IterateOptions, fn func(it storage.Iterator) error) error {
|
||||
func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var all storage.Items
|
||||
var err error
|
||||
if !opts.Reverse {
|
||||
all, err = client.allPrefixedItems(opts.Prefix, opts.First, nil)
|
||||
} else {
|
||||
@ -173,7 +182,7 @@ func (client *Client) Iterate(opts storage.IterateOptions, fn func(it storage.It
|
||||
if opts.Reverse {
|
||||
all = storage.ReverseItems(all)
|
||||
}
|
||||
return fn(&storage.StaticIterator{
|
||||
return fn(ctx, &storage.StaticIterator{
|
||||
Items: all,
|
||||
})
|
||||
}
|
||||
|
@ -4,14 +4,18 @@
|
||||
package storelogger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
var id int64
|
||||
|
||||
// Logger implements a zap.Logger for storage.KeyValueStore
|
||||
@ -28,47 +32,53 @@ func New(log *zap.Logger, store storage.KeyValueStore) *Logger {
|
||||
}
|
||||
|
||||
// Put adds a value to store
|
||||
func (store *Logger) Put(key storage.Key, value storage.Value) error {
|
||||
func (store *Logger) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.log.Debug("Put", zap.String("key", string(key)), zap.Int("value length", len(value)), zap.Binary("truncated value", truncate(value)))
|
||||
return store.store.Put(key, value)
|
||||
return store.store.Put(ctx, key, value)
|
||||
}
|
||||
|
||||
// Get gets a value to store
|
||||
func (store *Logger) Get(key storage.Key) (storage.Value, error) {
|
||||
func (store *Logger) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.log.Debug("Get", zap.String("key", string(key)))
|
||||
return store.store.Get(key)
|
||||
return store.store.Get(ctx, key)
|
||||
}
|
||||
|
||||
// GetAll gets all values from the store corresponding to keys
|
||||
func (store *Logger) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
func (store *Logger) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.log.Debug("GetAll", zap.Any("keys", keys))
|
||||
return store.store.GetAll(keys)
|
||||
return store.store.GetAll(ctx, keys)
|
||||
}
|
||||
|
||||
// Delete deletes key and the value
|
||||
func (store *Logger) Delete(key storage.Key) error {
|
||||
func (store *Logger) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.log.Debug("Delete", zap.String("key", string(key)))
|
||||
return store.store.Delete(key)
|
||||
return store.store.Delete(ctx, key)
|
||||
}
|
||||
|
||||
// List lists all keys starting from first and upto limit items
|
||||
func (store *Logger) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
keys, err := store.store.List(first, limit)
|
||||
func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
keys, err := store.store.List(ctx, first, limit)
|
||||
store.log.Debug("List", zap.String("first", string(first)), zap.Int("limit", limit), zap.Any("keys", keys.Strings()))
|
||||
return keys, err
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (store *Logger) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
|
||||
func (store *Logger) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.log.Debug("Iterate",
|
||||
zap.String("prefix", string(opts.Prefix)),
|
||||
zap.String("first", string(opts.First)),
|
||||
zap.Bool("recurse", opts.Recurse),
|
||||
zap.Bool("reverse", opts.Reverse),
|
||||
)
|
||||
return store.store.Iterate(opts, func(it storage.Iterator) error {
|
||||
return fn(storage.IteratorFunc(func(item *storage.ListItem) bool {
|
||||
ok := it.Next(item)
|
||||
return store.store.Iterate(ctx, opts, func(ctx context.Context, it storage.Iterator) error {
|
||||
return fn(ctx, storage.IteratorFunc(func(ctx context.Context, item *storage.ListItem) bool {
|
||||
ok := it.Next(ctx, item)
|
||||
if ok {
|
||||
store.log.Debug(" ",
|
||||
zap.String("key", string(item.Key)),
|
||||
|
@ -5,14 +5,18 @@ package teststore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var errInternal = errors.New("internal error")
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Client implements in-memory key value store
|
||||
type Client struct {
|
||||
@ -64,7 +68,8 @@ func (store *Client) forcedError() bool {
|
||||
}
|
||||
|
||||
// Put adds a value to store
|
||||
func (store *Client) Put(key storage.Key, value storage.Value) error {
|
||||
func (store *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.version++
|
||||
@ -95,7 +100,8 @@ func (store *Client) Put(key storage.Key, value storage.Value) error {
|
||||
}
|
||||
|
||||
// Get gets a value to store
|
||||
func (store *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
func (store *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.Get++
|
||||
@ -117,7 +123,8 @@ func (store *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
}
|
||||
|
||||
// GetAll gets all values from the store
|
||||
func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
func (store *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.GetAll++
|
||||
@ -142,7 +149,8 @@ func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
}
|
||||
|
||||
// Delete deletes key and the value
|
||||
func (store *Client) Delete(key storage.Key) error {
|
||||
func (store *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.version++
|
||||
@ -167,7 +175,8 @@ func (store *Client) Delete(key storage.Key) error {
|
||||
}
|
||||
|
||||
// List lists all keys starting from start and upto limit items
|
||||
func (store *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
func (store *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
store.mu.Lock()
|
||||
store.CallCount.List++
|
||||
if store.forcedError() {
|
||||
@ -175,7 +184,7 @@ func (store *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
return nil, errors.New("internal error")
|
||||
}
|
||||
store.mu.Unlock()
|
||||
return storage.ListKeys(store, first, limit)
|
||||
return storage.ListKeys(ctx, store, first, limit)
|
||||
}
|
||||
|
||||
// Close closes the store
|
||||
@ -190,7 +199,8 @@ func (store *Client) Close() error {
|
||||
}
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (store *Client) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
|
||||
func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.Iterate++
|
||||
@ -209,47 +219,48 @@ func (store *Client) Iterate(opts storage.IterateOptions, fn func(storage.Iterat
|
||||
var lastPrefix storage.Key
|
||||
var wasPrefix bool
|
||||
|
||||
return fn(storage.IteratorFunc(func(item *storage.ListItem) bool {
|
||||
next, ok := cursor.Advance()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return fn(ctx, storage.IteratorFunc(
|
||||
func(ctx context.Context, item *storage.ListItem) bool {
|
||||
next, ok := cursor.Advance()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if !opts.Recurse {
|
||||
if wasPrefix && bytes.HasPrefix(next.Key, lastPrefix) {
|
||||
next, ok = cursor.SkipPrefix(lastPrefix)
|
||||
if !opts.Recurse {
|
||||
if wasPrefix && bytes.HasPrefix(next.Key, lastPrefix) {
|
||||
next, ok = cursor.SkipPrefix(lastPrefix)
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
wasPrefix = false
|
||||
}
|
||||
wasPrefix = false
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.HasPrefix(next.Key, opts.Prefix) {
|
||||
cursor.close()
|
||||
return false
|
||||
}
|
||||
|
||||
if !opts.Recurse {
|
||||
if p := bytes.IndexByte([]byte(next.Key[len(opts.Prefix):]), storage.Delimiter); p >= 0 {
|
||||
lastPrefix = append(lastPrefix[:0], next.Key[:len(opts.Prefix)+p+1]...)
|
||||
|
||||
item.Key = append(item.Key[:0], lastPrefix...)
|
||||
item.Value = item.Value[:0]
|
||||
item.IsPrefix = true
|
||||
|
||||
wasPrefix = true
|
||||
return true
|
||||
if !bytes.HasPrefix(next.Key, opts.Prefix) {
|
||||
cursor.close()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
item.Key = append(item.Key[:0], next.Key...)
|
||||
item.Value = append(item.Value[:0], next.Value...)
|
||||
item.IsPrefix = false
|
||||
if !opts.Recurse {
|
||||
if p := bytes.IndexByte([]byte(next.Key[len(opts.Prefix):]), storage.Delimiter); p >= 0 {
|
||||
lastPrefix = append(lastPrefix[:0], next.Key[:len(opts.Prefix)+p+1]...)
|
||||
|
||||
return true
|
||||
}))
|
||||
item.Key = append(item.Key[:0], lastPrefix...)
|
||||
item.Value = item.Value[:0]
|
||||
item.IsPrefix = true
|
||||
|
||||
wasPrefix = true
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
item.Key = append(item.Key[:0], next.Key...)
|
||||
item.Value = append(item.Value[:0], next.Value...)
|
||||
item.IsPrefix = false
|
||||
|
||||
return true
|
||||
}))
|
||||
}
|
||||
|
||||
type advancer interface {
|
||||
|
@ -51,7 +51,7 @@ func RunBenchmarks(b *testing.B, store storage.KeyValueStore) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := store.Put(key, value)
|
||||
err := store.Put(ctx, key, value)
|
||||
if err != nil {
|
||||
b.Fatal("store.Put err", err)
|
||||
}
|
||||
@ -65,7 +65,7 @@ func RunBenchmarks(b *testing.B, store storage.KeyValueStore) {
|
||||
b.SetBytes(int64(len(items)))
|
||||
for k := 0; k < b.N; k++ {
|
||||
for _, item := range items {
|
||||
_, err := store.Get(item.Key)
|
||||
_, err := store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
@ -76,7 +76,7 @@ func RunBenchmarks(b *testing.B, store storage.KeyValueStore) {
|
||||
b.Run("ListV2 5", func(b *testing.B) {
|
||||
b.SetBytes(int64(len(items)))
|
||||
for k := 0; k < b.N; k++ {
|
||||
_, _, err := storage.ListV2(store, storage.ListOptions{
|
||||
_, _, err := storage.ListV2(ctx, store, storage.ListOptions{
|
||||
StartAfter: storage.Key("gamma"),
|
||||
Limit: 5,
|
||||
})
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -119,7 +120,7 @@ func newKVInputIterator(pathToFile string) (*KVInputIterator, error) {
|
||||
// Next should be called by BulkImporter instances in order to advance the iterator. It fills in
|
||||
// a storage.ListItem instance, and returns a boolean indicating whether to continue. When false is
|
||||
// returned, iteration should stop and nothing is expected to be changed in item.
|
||||
func (kvi *KVInputIterator) Next(item *storage.ListItem) bool {
|
||||
func (kvi *KVInputIterator) Next(ctx context.Context, item *storage.ListItem) bool {
|
||||
if !kvi.scanner.Scan() {
|
||||
kvi.reachedEnd = true
|
||||
kvi.err = kvi.scanner.Err()
|
||||
@ -241,8 +242,8 @@ func importBigPathset(tb testing.TB, store storage.KeyValueStore) {
|
||||
tb.Log("Performing manual import...")
|
||||
|
||||
var item storage.ListItem
|
||||
for inputIter.Next(&item) {
|
||||
if err := store.Put(item.Key, item.Value); err != nil {
|
||||
for inputIter.Next(ctx, &item) {
|
||||
if err := store.Put(ctx, item.Key, item.Value); err != nil {
|
||||
tb.Fatalf("Provided KeyValueStore failed to insert data (%q, %q): %v", item.Key, item.Value, err)
|
||||
}
|
||||
}
|
||||
@ -762,8 +763,8 @@ func cleanupBigPathset(tb testing.TB, store storage.KeyValueStore) {
|
||||
tb.Log("Performing manual cleanup...")
|
||||
|
||||
var item storage.ListItem
|
||||
for inputIter.Next(&item) {
|
||||
if err := store.Delete(item.Key); err != nil {
|
||||
for inputIter.Next(ctx, &item) {
|
||||
if err := store.Delete(ctx, item.Key); err != nil {
|
||||
tb.Fatalf("Provided KeyValueStore failed to delete item %q during cleanup: %v", item.Key, err)
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func testConstraints(t *testing.T, store storage.KeyValueStore) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := store.Put(key, value)
|
||||
err := store.Put(ctx, key, value)
|
||||
if err != nil {
|
||||
t.Fatal("store.Put err:", err)
|
||||
}
|
||||
@ -55,32 +55,32 @@ func testConstraints(t *testing.T, store storage.KeyValueStore) {
|
||||
t.Run("Put Empty", func(t *testing.T) {
|
||||
var key storage.Key
|
||||
var val storage.Value
|
||||
defer func() { _ = store.Delete(key) }()
|
||||
defer func() { _ = store.Delete(ctx, key) }()
|
||||
|
||||
err := store.Put(key, val)
|
||||
err := store.Put(ctx, key, val)
|
||||
if err == nil {
|
||||
t.Fatal("putting empty key should fail")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GetAll limit", func(t *testing.T) {
|
||||
_, err := store.GetAll(items[:storage.LookupLimit].GetKeys())
|
||||
_, err := store.GetAll(ctx, items[:storage.LookupLimit].GetKeys())
|
||||
if err != nil {
|
||||
t.Fatalf("GetAll LookupLimit should succeed: %v", err)
|
||||
}
|
||||
|
||||
_, err = store.GetAll(items[:storage.LookupLimit+1].GetKeys())
|
||||
_, err = store.GetAll(ctx, items[:storage.LookupLimit+1].GetKeys())
|
||||
if err == nil && err == storage.ErrLimitExceeded {
|
||||
t.Fatalf("GetAll LookupLimit+1 should fail: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("List limit", func(t *testing.T) {
|
||||
keys, err := store.List(nil, storage.LookupLimit)
|
||||
keys, err := store.List(ctx, nil, storage.LookupLimit)
|
||||
if err != nil || len(keys) != storage.LookupLimit {
|
||||
t.Fatalf("List LookupLimit should succeed: %v / got %d", err, len(keys))
|
||||
}
|
||||
_, err = store.List(nil, storage.LookupLimit+1)
|
||||
_, err = store.List(ctx, nil, storage.LookupLimit+1)
|
||||
if err != nil || len(keys) != storage.LookupLimit {
|
||||
t.Fatalf("List LookupLimit+1 shouldn't fail: %v / got %d", err, len(keys))
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for _, item := range items {
|
||||
err := store.Put(item.Key, item.Value)
|
||||
err := store.Put(ctx, item.Key, item.Value)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
@ -40,7 +40,7 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
t.Run("Get", func(t *testing.T) {
|
||||
for _, item := range items {
|
||||
value, err := store.Get(item.Key)
|
||||
value, err := store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
@ -53,7 +53,7 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
t.Run("GetAll", func(t *testing.T) {
|
||||
subset := items[:len(items)/2]
|
||||
keys := subset.GetKeys()
|
||||
values, err := store.GetAll(keys)
|
||||
values, err := store.GetAll(ctx, keys)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to GetAll %q: %v", keys, err)
|
||||
}
|
||||
@ -70,7 +70,7 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
t.Run("Update", func(t *testing.T) {
|
||||
for i, item := range items {
|
||||
next := items[(i+1)%len(items)]
|
||||
err := store.Put(item.Key, next.Value)
|
||||
err := store.Put(ctx, item.Key, next.Value)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to update %q = %v: %v", item.Key, next.Value, err)
|
||||
}
|
||||
@ -78,7 +78,7 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
for i, item := range items {
|
||||
next := items[(i+1)%len(items)]
|
||||
value, err := store.Get(item.Key)
|
||||
value, err := store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get updated %q = %v: %v", item.Key, next.Value, err)
|
||||
}
|
||||
@ -90,14 +90,14 @@ func testCRUD(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
t.Run("Delete", func(t *testing.T) {
|
||||
for _, item := range items {
|
||||
err := store.Delete(item.Key)
|
||||
err := store.Delete(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete %v: %v", item.Key, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
value, err := store.Get(item.Key)
|
||||
value, err := store.Get(ctx, item.Key)
|
||||
if err == nil {
|
||||
t.Fatalf("got deleted value %q = %v", item.Key, value)
|
||||
}
|
||||
|
@ -4,12 +4,15 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var ctx = context.Background() // test context
|
||||
|
||||
func testIterate(t *testing.T, store storage.KeyValueStore) {
|
||||
items := storage.Items{
|
||||
newItem("a", "a", false),
|
||||
@ -25,7 +28,7 @@ func testIterate(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
defer cleanupItems(store, items)
|
||||
if err := storage.PutAll(store, items...); err != nil {
|
||||
if err := storage.PutAll(ctx, store, items...); err != nil {
|
||||
t.Fatalf("failed to setup: %v", err)
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ func testIterateAll(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
defer cleanupItems(store, items)
|
||||
if err := storage.PutAll(store, items...); err != nil {
|
||||
if err := storage.PutAll(ctx, store, items...); err != nil {
|
||||
t.Fatalf("failed to setup: %v", err)
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ func testList(t *testing.T, store storage.KeyValueStore) {
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
|
||||
defer cleanupItems(store, items)
|
||||
if err := storage.PutAll(store, items...); err != nil {
|
||||
if err := storage.PutAll(ctx, store, items...); err != nil {
|
||||
t.Fatalf("failed to setup: %v", err)
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ func testList(t *testing.T, store storage.KeyValueStore) {
|
||||
for _, test := range tests {
|
||||
var keys storage.Keys
|
||||
var err error
|
||||
keys, err = store.List(test.First, test.Limit)
|
||||
keys, err = store.List(ctx, test.First, test.Limit)
|
||||
if err != nil {
|
||||
t.Errorf("%s: %s", test.Name, err)
|
||||
continue
|
||||
|
@ -26,7 +26,7 @@ func testListV2(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
defer cleanupItems(store, items)
|
||||
if err := storage.PutAll(store, items...); err != nil {
|
||||
if err := storage.PutAll(ctx, store, items...); err != nil {
|
||||
t.Fatalf("failed to setup: %v", err)
|
||||
}
|
||||
|
||||
@ -168,7 +168,7 @@ func testListV2(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
got, more, err := storage.ListV2(store, test.Options)
|
||||
got, more, err := storage.ListV2(ctx, store, test.Options)
|
||||
if err != nil {
|
||||
t.Errorf("%v: %v", test.Name, err)
|
||||
continue
|
||||
|
@ -27,13 +27,13 @@ func testParallel(t *testing.T, store storage.KeyValueStore) {
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Put
|
||||
err := store.Put(item.Key, item.Value)
|
||||
err := store.Put(ctx, item.Key, item.Value)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
|
||||
// Get
|
||||
value, err := store.Get(item.Key)
|
||||
value, err := store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
@ -42,7 +42,7 @@ func testParallel(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
|
||||
// GetAll
|
||||
values, err := store.GetAll([]storage.Key{item.Key})
|
||||
values, err := store.GetAll(ctx, []storage.Key{item.Key})
|
||||
if len(values) != 1 {
|
||||
t.Fatalf("failed to GetAll: %v", err)
|
||||
}
|
||||
@ -53,12 +53,12 @@ func testParallel(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
// Update value
|
||||
nextValue := storage.Value(string(item.Value) + "X")
|
||||
err = store.Put(item.Key, nextValue)
|
||||
err = store.Put(ctx, item.Key, nextValue)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to update %q = %v: %v", item.Key, nextValue, err)
|
||||
}
|
||||
|
||||
value, err = store.Get(item.Key)
|
||||
value, err = store.Get(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %q = %v: %v", item.Key, nextValue, err)
|
||||
}
|
||||
@ -66,7 +66,7 @@ func testParallel(t *testing.T, store storage.KeyValueStore) {
|
||||
t.Fatalf("invalid updated value for %q = %v: got %v", item.Key, nextValue, value)
|
||||
}
|
||||
|
||||
err = store.Delete(item.Key)
|
||||
err = store.Delete(ctx, item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete %v: %v", item.Key, err)
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ func testPrefix(t *testing.T, store storage.KeyValueStore) {
|
||||
}
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
defer cleanupItems(store, items)
|
||||
if err := storage.PutAll(store, items...); err != nil {
|
||||
if err := storage.PutAll(ctx, store, items...); err != nil {
|
||||
t.Fatalf("failed to setup: %v", err)
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
@ -22,7 +23,7 @@ func newItem(key, value string, isPrefix bool) storage.ListItem {
|
||||
|
||||
func cleanupItems(store storage.KeyValueStore, items storage.Items) {
|
||||
for _, item := range items {
|
||||
_ = store.Delete(item.Key)
|
||||
_ = store.Delete(ctx, item.Key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,7 +49,7 @@ func testIterations(t *testing.T, store storage.KeyValueStore, tests []iteration
|
||||
|
||||
func isEmptyKVStore(tb testing.TB, store storage.KeyValueStore) bool {
|
||||
tb.Helper()
|
||||
keys, err := store.List(storage.Key(""), 1)
|
||||
keys, err := store.List(ctx, storage.Key(""), 1)
|
||||
if err != nil {
|
||||
tb.Fatalf("Failed to check if KeyValueStore is empty: %v", err)
|
||||
}
|
||||
@ -60,9 +61,9 @@ type collector struct {
|
||||
Limit int
|
||||
}
|
||||
|
||||
func (collect *collector) include(it storage.Iterator) error {
|
||||
func (collect *collector) include(ctx context.Context, it storage.Iterator) error {
|
||||
var item storage.ListItem
|
||||
for (collect.Limit < 0 || len(collect.Items) < collect.Limit) && it.Next(&item) {
|
||||
for (collect.Limit < 0 || len(collect.Items) < collect.Limit) && it.Next(ctx, &item) {
|
||||
collect.Items = append(collect.Items, storage.CloneItem(item))
|
||||
}
|
||||
return nil
|
||||
@ -70,7 +71,7 @@ func (collect *collector) include(it storage.Iterator) error {
|
||||
|
||||
func iterateItems(store storage.KeyValueStore, opts storage.IterateOptions, limit int) (storage.Items, error) {
|
||||
collect := &collector{Limit: limit}
|
||||
err := store.Iterate(opts, collect.include)
|
||||
err := store.Iterate(ctx, opts, collect.include)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -3,7 +3,10 @@
|
||||
|
||||
package storage
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// NextKey returns the successive key
|
||||
func NextKey(key Key) Key {
|
||||
@ -42,9 +45,11 @@ func CloneItems(items Items) Items {
|
||||
}
|
||||
|
||||
// PutAll adds multiple values to the store
|
||||
func PutAll(store KeyValueStore, items ...ListItem) error {
|
||||
func PutAll(ctx context.Context, store KeyValueStore, items ...ListItem) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
for _, item := range items {
|
||||
err := store.Put(item.Key, item.Value)
|
||||
err := store.Put(ctx, item.Key, item.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to put %v: %v", item, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user