Move pointerdb logic to a service (#1037)
* test * Move pointerdb logic to service * tset * reorder constructor params * restore field
This commit is contained in:
parent
2d53a33bc5
commit
cc1bdef8b7
@ -29,7 +29,8 @@ func TestQueryNoAgreements(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
pointerdb := pointerdb.NewServer(teststore.New(), &overlay.Cache{}, zap.NewNop(), pointerdb.Config{}, nil)
|
||||
service := pointerdb.NewService(zap.NewNop(), teststore.New())
|
||||
pointerdb := pointerdb.NewServer(zap.NewNop(), service, &overlay.Cache{}, pointerdb.Config{}, nil)
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
db, err := satellitedb.NewInMemory()
|
||||
assert.NoError(t, err)
|
||||
@ -46,7 +47,8 @@ func TestQueryWithBw(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
pointerdb := pointerdb.NewServer(teststore.New(), &overlay.Cache{}, zap.NewNop(), pointerdb.Config{}, nil)
|
||||
service := pointerdb.NewService(zap.NewNop(), teststore.New())
|
||||
pointerdb := pointerdb.NewServer(zap.NewNop(), service, &overlay.Cache{}, pointerdb.Config{}, nil)
|
||||
overlayServer := mocks.NewOverlay([]*pb.Node{})
|
||||
|
||||
db, err := satellitedb.NewInMemory()
|
||||
|
@ -64,7 +64,9 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) error {
|
||||
|
||||
cache := overlay.LoadFromContext(ctx)
|
||||
dblogged := storelogger.New(zap.L().Named("pdb"), db)
|
||||
s := NewServer(dblogged, cache, zap.L(), c, server.Identity())
|
||||
|
||||
service := NewService(zap.L(), dblogged)
|
||||
s := NewServer(zap.L(), service, cache, c, server.Identity())
|
||||
pb.RegisterPointerDBServer(server.GRPC(), s)
|
||||
// add the server to the context
|
||||
ctx = context.WithValue(ctx, ctxKey, s)
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
@ -24,7 +23,6 @@ import (
|
||||
"storj.io/storj/pkg/peertls"
|
||||
pointerdbAuth "storj.io/storj/pkg/pointerdb/auth"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -35,21 +33,21 @@ var (
|
||||
|
||||
// Server implements the network state RPC service
|
||||
type Server struct {
|
||||
DB storage.KeyValueStore
|
||||
logger *zap.Logger
|
||||
config Config
|
||||
service *Service
|
||||
cache *overlay.Cache
|
||||
config Config
|
||||
identity *provider.FullIdentity
|
||||
}
|
||||
|
||||
// NewServer creates instance of Server
|
||||
func NewServer(db storage.KeyValueStore, cache *overlay.Cache, logger *zap.Logger, c Config, identity *provider.FullIdentity) *Server {
|
||||
func NewServer(logger *zap.Logger, service *Service, cache *overlay.Cache, config Config, identity *provider.FullIdentity) *Server {
|
||||
// TODO: reorder arguments
|
||||
return &Server{
|
||||
DB: db,
|
||||
logger: logger,
|
||||
config: c,
|
||||
service: service,
|
||||
cache: cache,
|
||||
config: config,
|
||||
identity: identity,
|
||||
}
|
||||
}
|
||||
@ -106,19 +104,7 @@ func (s *Server) Put(ctx context.Context, req *pb.PutRequest) (resp *pb.PutRespo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update the pointer with the creation date
|
||||
req.GetPointer().CreationDate = ptypes.TimestampNow()
|
||||
|
||||
pointerBytes, err := proto.Marshal(req.GetPointer())
|
||||
if err != nil {
|
||||
s.logger.Error("err marshaling pointer", zap.Error(err))
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// TODO(kaloyan): make sure that we know we are overwriting the pointer!
|
||||
// In such case we should delete the pieces of the old segment if it was
|
||||
// a remote one.
|
||||
if err = s.DB.Put([]byte(req.GetPath()), pointerBytes); err != nil {
|
||||
if err = s.service.Put(req.GetPath(), req.GetPointer()); err != nil {
|
||||
s.logger.Error("err putting pointer", zap.Error(err))
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
@ -134,7 +120,7 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (resp *pb.GetRespo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pointerBytes, err := s.DB.Get([]byte(req.GetPath()))
|
||||
pointer, err := s.service.Get(req.GetPath())
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
return nil, status.Errorf(codes.NotFound, err.Error())
|
||||
@ -143,13 +129,6 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (resp *pb.GetRespo
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
pointer := &pb.Pointer{}
|
||||
err = proto.Unmarshal(pointerBytes, pointer)
|
||||
if err != nil {
|
||||
s.logger.Error("Error unmarshaling pointer")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pba, err := s.PayerBandwidthAllocation(ctx, &pb.PayerBandwidthAllocationRequest{Action: pb.PayerBandwidthAllocation_GET})
|
||||
if err != nil {
|
||||
s.logger.Error("err getting payer bandwidth allocation", zap.Error(err))
|
||||
@ -204,84 +183,14 @@ func (s *Server) List(ctx context.Context, req *pb.ListRequest) (resp *pb.ListRe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var prefix storage.Key
|
||||
if req.Prefix != "" {
|
||||
prefix = storage.Key(req.Prefix)
|
||||
if prefix[len(prefix)-1] != storage.Delimiter {
|
||||
prefix = append(prefix, storage.Delimiter)
|
||||
}
|
||||
}
|
||||
|
||||
rawItems, more, err := storage.ListV2(s.DB, storage.ListOptions{
|
||||
Prefix: prefix, //storage.Key(req.Prefix),
|
||||
StartAfter: storage.Key(req.StartAfter),
|
||||
EndBefore: storage.Key(req.EndBefore),
|
||||
Recursive: req.Recursive,
|
||||
Limit: int(req.Limit),
|
||||
IncludeValue: req.MetaFlags != meta.None,
|
||||
})
|
||||
items, more, err := s.service.List(req.Prefix, req.StartAfter, req.EndBefore, req.Recursive, req.Limit, req.MetaFlags)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "ListV2: %v", err)
|
||||
}
|
||||
|
||||
var items []*pb.ListResponse_Item
|
||||
for _, rawItem := range rawItems {
|
||||
items = append(items, s.createListItem(rawItem, req.MetaFlags))
|
||||
}
|
||||
|
||||
return &pb.ListResponse{Items: items, More: more}, nil
|
||||
}
|
||||
|
||||
// createListItem creates a new list item with the given path. It also adds
|
||||
// the metadata according to the given metaFlags.
|
||||
func (s *Server) createListItem(rawItem storage.ListItem, metaFlags uint32) *pb.ListResponse_Item {
|
||||
item := &pb.ListResponse_Item{
|
||||
Path: rawItem.Key.String(),
|
||||
IsPrefix: rawItem.IsPrefix,
|
||||
}
|
||||
if item.IsPrefix {
|
||||
return item
|
||||
}
|
||||
|
||||
err := s.setMetadata(item, rawItem.Value, metaFlags)
|
||||
if err != nil {
|
||||
s.logger.Warn("err retrieving metadata", zap.Error(err))
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// getMetadata adds the metadata to the given item pointer according to the
|
||||
// given metaFlags
|
||||
func (s *Server) setMetadata(item *pb.ListResponse_Item, data []byte, metaFlags uint32) (err error) {
|
||||
if metaFlags == meta.None || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pr := &pb.Pointer{}
|
||||
err = proto.Unmarshal(data, pr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start with an empty pointer to and add only what's requested in
|
||||
// metaFlags to safe to transfer payload
|
||||
item.Pointer = &pb.Pointer{}
|
||||
if metaFlags&meta.Modified != 0 {
|
||||
item.Pointer.CreationDate = pr.GetCreationDate()
|
||||
}
|
||||
if metaFlags&meta.Expiration != 0 {
|
||||
item.Pointer.ExpirationDate = pr.GetExpirationDate()
|
||||
}
|
||||
if metaFlags&meta.Size != 0 {
|
||||
item.Pointer.SegmentSize = pr.GetSegmentSize()
|
||||
}
|
||||
if metaFlags&meta.UserDefined != 0 {
|
||||
item.Pointer.Metadata = pr.GetMetadata()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete formats and hands off a file path to delete from boltdb
|
||||
func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (resp *pb.DeleteResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -290,7 +199,7 @@ func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (resp *pb.De
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.DB.Delete([]byte(req.GetPath()))
|
||||
err = s.service.Delete(req.GetPath())
|
||||
if err != nil {
|
||||
s.logger.Error("err deleting path and pointer", zap.Error(err))
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
@ -307,13 +216,7 @@ func (s *Server) Iterate(ctx context.Context, req *pb.IterateRequest, f func(it
|
||||
return err
|
||||
}
|
||||
|
||||
opts := storage.IterateOptions{
|
||||
Prefix: storage.Key(req.Prefix),
|
||||
First: storage.Key(req.First),
|
||||
Recurse: req.Recurse,
|
||||
Reverse: req.Reverse,
|
||||
}
|
||||
return s.DB.Iterate(opts, f)
|
||||
return s.service.Iterate(req.Prefix, req.First, req.Recurse, req.Reverse, f)
|
||||
}
|
||||
|
||||
// PayerBandwidthAllocation returns PayerBandwidthAllocation struct, signed and with given action type
|
||||
|
@ -45,7 +45,8 @@ func TestServicePut(t *testing.T) {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
db := teststore.New()
|
||||
s := Server{DB: db, logger: zap.NewNop()}
|
||||
service := NewService(zap.NewNop(), db)
|
||||
s := Server{service: service, logger: zap.NewNop()}
|
||||
|
||||
path := "a/b/c"
|
||||
pr := pb.Pointer{}
|
||||
@ -93,7 +94,8 @@ func TestServiceGet(t *testing.T) {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
db := teststore.New()
|
||||
s := Server{DB: db, logger: zap.NewNop(), identity: identity}
|
||||
service := NewService(zap.NewNop(), db)
|
||||
s := Server{service: service, logger: zap.NewNop(), identity: identity}
|
||||
|
||||
path := "a/b/c"
|
||||
|
||||
@ -142,7 +144,8 @@ func TestServiceDelete(t *testing.T) {
|
||||
|
||||
db := teststore.New()
|
||||
_ = db.Put(storage.Key(path), storage.Value("hello"))
|
||||
s := Server{DB: db, logger: zap.NewNop()}
|
||||
service := NewService(zap.NewNop(), db)
|
||||
s := Server{service: service, logger: zap.NewNop()}
|
||||
|
||||
if tt.err != nil {
|
||||
db.ForceError++
|
||||
@ -161,7 +164,8 @@ func TestServiceDelete(t *testing.T) {
|
||||
|
||||
func TestServiceList(t *testing.T) {
|
||||
db := teststore.New()
|
||||
server := Server{DB: db, logger: zap.NewNop()}
|
||||
service := NewService(zap.NewNop(), db)
|
||||
server := Server{service: service, logger: zap.NewNop()}
|
||||
|
||||
pointer := &pb.Pointer{}
|
||||
pointer.CreationDate = ptypes.TimestampNow()
|
||||
|
158
pkg/pointerdb/service.go
Normal file
158
pkg/pointerdb/service.go
Normal file
@ -0,0 +1,158 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package pointerdb
|
||||
|
||||
import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
// Service structure
|
||||
type Service struct {
|
||||
logger *zap.Logger
|
||||
DB storage.KeyValueStore
|
||||
}
|
||||
|
||||
// NewService creates new pointerdb service
|
||||
func NewService(logger *zap.Logger, db storage.KeyValueStore) *Service {
|
||||
return &Service{logger: logger, DB: db}
|
||||
}
|
||||
|
||||
// Put puts pointer to db under specific path
|
||||
func (s *Service) Put(path string, pointer *pb.Pointer) (err error) {
|
||||
// Update the pointer with the creation date
|
||||
pointer.CreationDate = ptypes.TimestampNow()
|
||||
|
||||
pointerBytes, err := proto.Marshal(pointer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(kaloyan): make sure that we know we are overwriting the pointer!
|
||||
// In such case we should delete the pieces of the old segment if it was
|
||||
// a remote one.
|
||||
if err = s.DB.Put([]byte(path), pointerBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get gets pointer from db
|
||||
func (s *Service) Get(path string) (pointer *pb.Pointer, err error) {
|
||||
pointerBytes, err := s.DB.Get([]byte(path))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pointer = &pb.Pointer{}
|
||||
err = proto.Unmarshal(pointerBytes, pointer)
|
||||
if err != nil {
|
||||
return nil, errs.New("error unmarshaling pointer: %v", err)
|
||||
}
|
||||
|
||||
return pointer, nil
|
||||
}
|
||||
|
||||
// List returns all Path keys in the pointers bucket
|
||||
func (s *Service) List(prefix string, startAfter string, endBefore string, recursive bool, limit int32,
|
||||
metaFlags uint32) (items []*pb.ListResponse_Item, more bool, err error) {
|
||||
|
||||
var prefixKey storage.Key
|
||||
if prefix != "" {
|
||||
prefixKey = storage.Key(prefix)
|
||||
if prefix[len(prefix)-1] != storage.Delimiter {
|
||||
prefixKey = append(prefixKey, storage.Delimiter)
|
||||
}
|
||||
}
|
||||
|
||||
rawItems, more, err := storage.ListV2(s.DB, storage.ListOptions{
|
||||
Prefix: prefixKey,
|
||||
StartAfter: storage.Key(startAfter),
|
||||
EndBefore: storage.Key(endBefore),
|
||||
Recursive: recursive,
|
||||
Limit: int(limit),
|
||||
IncludeValue: metaFlags != meta.None,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
for _, rawItem := range rawItems {
|
||||
items = append(items, s.createListItem(rawItem, metaFlags))
|
||||
}
|
||||
return items, more, nil
|
||||
}
|
||||
|
||||
// createListItem creates a new list item with the given path. It also adds
|
||||
// the metadata according to the given metaFlags.
|
||||
func (s *Service) createListItem(rawItem storage.ListItem, metaFlags uint32) *pb.ListResponse_Item {
|
||||
item := &pb.ListResponse_Item{
|
||||
Path: rawItem.Key.String(),
|
||||
IsPrefix: rawItem.IsPrefix,
|
||||
}
|
||||
if item.IsPrefix {
|
||||
return item
|
||||
}
|
||||
|
||||
err := s.setMetadata(item, rawItem.Value, metaFlags)
|
||||
if err != nil {
|
||||
s.logger.Warn("err retrieving metadata", zap.Error(err))
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// getMetadata adds the metadata to the given item pointer according to the
|
||||
// given metaFlags
|
||||
func (s *Service) setMetadata(item *pb.ListResponse_Item, data []byte, metaFlags uint32) (err error) {
|
||||
if metaFlags == meta.None || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pr := &pb.Pointer{}
|
||||
err = proto.Unmarshal(data, pr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start with an empty pointer to and add only what's requested in
|
||||
// metaFlags to safe to transfer payload
|
||||
item.Pointer = &pb.Pointer{}
|
||||
if metaFlags&meta.Modified != 0 {
|
||||
item.Pointer.CreationDate = pr.GetCreationDate()
|
||||
}
|
||||
if metaFlags&meta.Expiration != 0 {
|
||||
item.Pointer.ExpirationDate = pr.GetExpirationDate()
|
||||
}
|
||||
if metaFlags&meta.Size != 0 {
|
||||
item.Pointer.SegmentSize = pr.GetSegmentSize()
|
||||
}
|
||||
if metaFlags&meta.UserDefined != 0 {
|
||||
item.Pointer.Metadata = pr.GetMetadata()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes from item from db
|
||||
func (s *Service) Delete(path string) (err error) {
|
||||
return s.DB.Delete([]byte(path))
|
||||
}
|
||||
|
||||
// Iterate iterates over items in db
|
||||
func (s *Service) Iterate(prefix string, first string, recurse bool, reverse bool, f func(it storage.Iterator) error) (err error) {
|
||||
opts := storage.IterateOptions{
|
||||
Prefix: storage.Key(prefix),
|
||||
First: storage.Key(first),
|
||||
Recurse: recurse,
|
||||
Reverse: reverse,
|
||||
}
|
||||
return s.DB.Iterate(opts, f)
|
||||
}
|
@ -110,6 +110,7 @@ type Peer struct {
|
||||
|
||||
Metainfo struct {
|
||||
Database storage.KeyValueStore // TODO: move into pointerDB
|
||||
Service *pointerdb.Service
|
||||
Endpoint *pointerdb.Server
|
||||
}
|
||||
|
||||
@ -229,7 +230,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
||||
}
|
||||
|
||||
peer.Metainfo.Database = storelogger.New(peer.Log.Named("pdb"), db)
|
||||
peer.Metainfo.Endpoint = pointerdb.NewServer(peer.Metainfo.Database, peer.Overlay.Service, peer.Log.Named("pointerdb"), config.PointerDB, peer.Identity)
|
||||
peer.Metainfo.Service = pointerdb.NewService(peer.Log.Named("pointerdb"), peer.Metainfo.Database)
|
||||
peer.Metainfo.Endpoint = pointerdb.NewServer(peer.Log.Named("pointerdb:endpoint"), peer.Metainfo.Service, peer.Overlay.Service, config.PointerDB, peer.Identity)
|
||||
pb.RegisterPointerDBServer(peer.Public.Server.GRPC(), peer.Metainfo.Endpoint)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user