storj/pkg/pointerdb/pointerdb.go
Kaloyan Raev daf391e473
Refactor List in PointerDB (#163)
* Refactor List in Pointer DB

* Fix pointerdb-client example

* Fix issue in Path type related to empty paths

* Test for the PointerDB service with some fixes

* Fixed debug message in example: trancated --> more

* GoDoc comments for unexported methods

* TODO comment to check if Put is overwriting

* Log warning if protobuf timestamp cannot be converted

* TODO comment to make ListPageLimit configurable

* Rename 'segment' package to 'segments' to reflect folder name
2018-07-27 09:02:59 +03:00

292 lines
8.1 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package pointerdb
import (
"context"
"reflect"
"strings"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"storj.io/storj/pkg/paths"
meta "storj.io/storj/pkg/storage"
"storj.io/storj/pointerdb/auth"
pb "storj.io/storj/protos/pointerdb"
"storj.io/storj/storage"
)
// ListPageLimit is the maximum number of items that will be returned by a list
// request.
// TODO(kaloyan): make it configurable
const ListPageLimit = 1000
// Server implements the network state RPC service
type Server struct {
DB storage.KeyValueStore
logger *zap.Logger
}
// NewServer creates instance of Server
func NewServer(db storage.KeyValueStore, logger *zap.Logger) *Server {
return &Server{
DB: db,
logger: logger,
}
}
func (s *Server) validateAuth(APIKey []byte) error {
if !auth.ValidateAPIKey(string(APIKey)) {
s.logger.Error("unauthorized request: ", zap.Error(grpc.Errorf(codes.Unauthenticated, "Invalid API credential")))
return grpc.Errorf(codes.Unauthenticated, "Invalid API credential")
}
return nil
}
// Put formats and hands off a key/value (path/pointer) to be saved to boltdb
func (s *Server) Put(ctx context.Context, req *pb.PutRequest) (resp *pb.PutResponse, err error) {
defer mon.Task()(&ctx)(&err)
s.logger.Debug("entering pointerdb put")
if err = s.validateAuth(req.GetAPIKey()); err != nil {
return nil, err
}
// Update the pointer with the creation date
req.GetPointer().CreationDate = ptypes.TimestampNow()
pointerBytes, err := proto.Marshal(req.GetPointer())
if err != nil {
s.logger.Error("err marshaling pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
// TODO(kaloyan): make sure that we know we are overwriting the pointer!
// In such case we should delete the pieces of the old segment if it was
// a remote one.
if err = s.DB.Put([]byte(req.GetPath()), pointerBytes); err != nil {
s.logger.Error("err putting pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
s.logger.Debug("put to the db: " + string(req.GetPath()))
return &pb.PutResponse{}, nil
}
// Get formats and hands off a file path to get from boltdb
func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (resp *pb.GetResponse, err error) {
defer mon.Task()(&ctx)(&err)
s.logger.Debug("entering pointerdb get")
if err = s.validateAuth(req.GetAPIKey()); err != nil {
return nil, err
}
pointerBytes, err := s.DB.Get([]byte(req.GetPath()))
if err != nil {
s.logger.Error("err getting pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
return &pb.GetResponse{
Pointer: pointerBytes,
}, nil
}
// List calls the bolt client's List function and returns all Path keys in the Pointers bucket
func (s *Server) List(ctx context.Context, req *pb.ListRequest) (resp *pb.ListResponse, err error) {
defer mon.Task()(&ctx)(&err)
s.logger.Debug("entering pointerdb list")
limit := int(req.GetLimit())
if limit <= 0 || limit > ListPageLimit {
limit = ListPageLimit
}
if err = s.validateAuth(req.GetAPIKey()); err != nil {
return nil, err
}
prefix := paths.New(req.GetPrefix())
// TODO(kaloyan): here we query the DB without limit. We must optimize it!
keys, err := s.DB.List(prefix.Bytes(), 0)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
var more bool
var items []*pb.ListResponse_Item
if req.GetEndBefore() != "" && req.GetStartAfter() == "" {
items, more = s.processKeysBackwards(ctx, keys, prefix,
req.GetEndBefore(), req.GetRecursive(), limit, req.GetMetaFlags())
} else {
items, more = s.processKeysForwards(ctx, keys, prefix, req.GetStartAfter(),
req.GetEndBefore(), req.GetRecursive(), limit, req.GetMetaFlags())
}
s.logger.Debug("path keys retrieved")
return &pb.ListResponse{Items: items, More: more}, nil
}
// processKeysForwards iterates forwards through given keys, and returns them
// as list items
func (s *Server) processKeysForwards(ctx context.Context, keys storage.Keys,
prefix paths.Path, startAfter, endBefore string, recursive bool, limit int,
metaFlags uint64) (items []*pb.ListResponse_Item, more bool) {
skip := startAfter != ""
startAfterPath := prefix.Append(startAfter)
endBeforePath := prefix.Append(endBefore)
for _, key := range keys {
p := paths.New(string(key))
if skip {
if reflect.DeepEqual(p, startAfterPath) {
// TODO(kaloyan): Better check - what if there is no path equal to startAfter?
// TODO(kaloyan): Add Equal method in Path type
skip = false
}
continue
}
// TODO(kaloyan): Better check - what if there is no path equal to endBefore?
// TODO(kaloyan): Add Equal method in Path type
if reflect.DeepEqual(p, endBeforePath) {
break
}
// TODO(kaloyan): add HasPrefix method to Path type
if !strings.HasPrefix(p.String(), prefix.String()) {
// We went through all keys that start with the prefix
break
}
if !recursive && len(p) > len(prefix)+1 {
continue
}
item := s.createListItem(ctx, p, metaFlags)
items = append(items, item)
if len(items) == limit {
more = true
break
}
}
return items, more
}
// processKeysBackwards iterates backwards through given keys, and returns them
// as list items
func (s *Server) processKeysBackwards(ctx context.Context, keys storage.Keys,
prefix paths.Path, endBefore string, recursive bool, limit int,
metaFlags uint64) (items []*pb.ListResponse_Item, more bool) {
skip := endBefore != ""
endBeforePath := prefix.Append(endBefore)
for i := len(keys) - 1; i >= 0; i-- {
key := keys[i]
p := paths.New(string(key))
if skip {
if reflect.DeepEqual(p, endBeforePath) {
// TODO(kaloyan): Better check - what if there is no path equal to endBefore?
// TODO(kaloyan): Add Equal method in Path type
skip = false
}
continue
}
// TODO(kaloyan): add HasPrefix method to Path type
if !strings.HasPrefix(p.String(), prefix.String()) {
// We went through all keys that start with the prefix
break
}
if !recursive && len(p) > len(prefix)+1 {
continue
}
item := s.createListItem(ctx, p, metaFlags)
items = append([]*pb.ListResponse_Item{item}, items...)
if len(items) == limit {
more = true
break
}
}
return items, more
}
// createListItem creates a new list item with the given path. It also adds
// the metadata according to the given metaFlags.
func (s *Server) createListItem(ctx context.Context, p paths.Path,
metaFlags uint64) *pb.ListResponse_Item {
item := &pb.ListResponse_Item{Path: p.String()}
err := s.getMetadata(ctx, item, metaFlags)
if err != nil {
s.logger.Warn("err retrieving metadata", zap.Error(err))
}
return item
}
// getMetadata adds the metadata to the given item pointer according to the
// given metaFlags
func (s *Server) getMetadata(ctx context.Context, item *pb.ListResponse_Item,
metaFlags uint64) (err error) {
defer mon.Task()(&ctx)(&err)
if metaFlags == meta.MetaNone {
return nil
}
b, err := s.DB.Get([]byte(item.GetPath()))
if err != nil {
return err
}
pr := &pb.Pointer{}
err = proto.Unmarshal(b, pr)
if err != nil {
return err
}
// TODO(kaloyan): revisit after clarifying how to store and serialize metadata
if metaFlags&meta.MetaModified != 0 {
item.CreationDate = pr.GetCreationDate()
}
if metaFlags&meta.MetaExpiration != 0 {
item.ExpirationDate = pr.GetExpirationDate()
}
if metaFlags&meta.MetaUserDefined != 0 {
item.Metadata = pr.GetMetadata()
}
return nil
}
// Delete formats and hands off a file path to delete from boltdb
func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (resp *pb.DeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
s.logger.Debug("entering pointerdb delete")
if err = s.validateAuth(req.GetAPIKey()); err != nil {
return nil, err
}
err = s.DB.Delete([]byte(req.GetPath()))
if err != nil {
s.logger.Error("err deleting path and pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
s.logger.Debug("deleted pointer at path: " + string(req.GetPath()))
return &pb.DeleteResponse{}, nil
}