Streamstore impl of Stor interface (#191)

* first stab at PUT

* only PUT

* working on PUT

* Put with LimitReader

* start of Get

* reorder of files and proto meta

* working on Meta

* working on Meta

* add aware limit reader

* add size from segment put

* rm if for eof

* update to proto meta

* update gen proto file

* working on get

* working on get

* working on get

* working on list

* working on delete

* working on list

* working on meta method

* fix merge error and working on feedback from PR

* update to proto file

* rm size tuple

* mv eof limit reader to new file

* add toMeta

* rm varible names

* add updates from PR feedback

* updates from PR feedback

* updates from PR feedback

* add toMeta size based on total size

* update toMeta size calculation

* rm passthrough

* add default to config for segment size

* fix get method ranger bug

* add object support for nested stream proto

* rm nested stream meta data

* rm test for another PR
This commit is contained in:
James Hagans 2018-08-23 23:56:38 -04:00 committed by JT Olio
parent 3456a127cf
commit fdfa6e85c8
7 changed files with 366 additions and 92 deletions

View File

@ -20,7 +20,7 @@ import (
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storage/objects"
segment "storj.io/storj/pkg/storage/segments"
"storj.io/storj/pkg/storage/streams"
streams "storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/transport"
)
@ -52,6 +52,7 @@ type ClientConfig struct {
APIKey string `help:"API Key (TODO: this needs to change to macaroons somehow)"`
MaxInlineSize int `help:"max inline segment size in bytes" default:"4096"`
SegmentSize int64 `help:"the size of a segment in bytes" default:"64000000"`
}
// Config is a general miniogw configuration struct. This should be everything
@ -150,9 +151,11 @@ func (c Config) NewGateway(ctx context.Context,
segments := segment.NewSegmentStore(oc, ec, pdb, rs, c.MaxInlineSize)
// TODO(jt): wrap segments and turn segments into streams actually
// TODO: passthrough is bad
stream := streams.NewPassthrough(segments)
// segment size 64MB
stream, err := streams.NewStreamStore(segments, c.SegmentSize)
if err != nil {
return nil, err
}
obj := objects.NewStore(stream)
return NewStorjGateway(buckets.NewStore(obj)), nil

View File

@ -0,0 +1,29 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package streams
import "io"
// EOFAwareLimitReader holds reader and status of EOF
type EOFAwareLimitReader struct {
reader io.Reader
eof bool
}
// EOFAwareReader keeps track of the state, has the internal reader reached EOF
func EOFAwareReader(r io.Reader) *EOFAwareLimitReader {
return &EOFAwareLimitReader{reader: r, eof: false}
}
func (r *EOFAwareLimitReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if err == io.EOF {
r.eof = true
}
return n, err
}
func (r *EOFAwareLimitReader) isEOF() bool {
return r.eof
}

View File

@ -1,82 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package streams
import (
"context"
"io"
"time"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/segments"
)
// Passthrough implementation of stream store
type Passthrough struct {
Segments segments.Store
}
// NewPassthrough stream store
func NewPassthrough(s segments.Store) *Passthrough {
return &Passthrough{Segments: s}
}
var _ Store = (*Passthrough)(nil)
// Meta implements Store.Meta
func (p *Passthrough) Meta(ctx context.Context, path paths.Path) (Meta, error) {
m, err := p.Segments.Meta(ctx, path)
return convertMeta(m), err
}
// Get implements Store.Get
func (p *Passthrough) Get(ctx context.Context, path paths.Path) (
ranger.RangeCloser, Meta, error) {
rr, m, err := p.Segments.Get(ctx, path)
return rr, convertMeta(m), err
}
// Put implements Store.Put
func (p *Passthrough) Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (Meta, error) {
m, err := p.Segments.Put(ctx, path, data, metadata, expiration)
return convertMeta(m), err
}
// Delete implements Store.Delete
func (p *Passthrough) Delete(ctx context.Context, path paths.Path) error {
return p.Segments.Delete(ctx, path)
}
// List implements Store.List
func (p *Passthrough) List(ctx context.Context,
prefix, startAfter, endBefore paths.Path, recursive bool, limit int,
metaFlags uint32) (items []ListItem, more bool, err error) {
segItems, more, err := p.Segments.List(ctx, prefix, startAfter, endBefore,
recursive, limit, metaFlags)
if err != nil {
return nil, false, nil
}
items = make([]ListItem, len(segItems))
for i, itm := range segItems {
items[i] = ListItem{
Path: itm.Path,
Meta: convertMeta(itm.Meta),
}
}
return items, more, nil
}
// convertMeta converts segment metadata to stream metadata
func convertMeta(m segments.Meta) Meta {
return Meta{
Modified: m.Modified,
Expiration: m.Expiration,
Size: m.Size,
Data: m.Data,
}
}

View File

@ -5,13 +5,22 @@ package streams
import (
"context"
"fmt"
"io"
"time"
proto "github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/paths"
"storj.io/storj/pkg/ranger"
ranger "storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/segments"
streamspb "storj.io/storj/protos/streams"
)
var mon = monkit.Package()
// Meta info about a segment
type Meta struct {
Modified time.Time
@ -20,13 +29,23 @@ type Meta struct {
Data []byte
}
// ListItem is a single item in a listing
type ListItem struct {
Path paths.Path
Meta Meta
// convertMeta converts segment metadata to stream metadata
func convertMeta(segmentMeta segments.Meta) (Meta, error) {
msi := streamspb.MetaStreamInfo{}
err := proto.Unmarshal(segmentMeta.Data, &msi)
if err != nil {
return Meta{}, err
}
return Meta{
Modified: segmentMeta.Modified,
Expiration: segmentMeta.Expiration,
Size: ((msi.NumberOfSegments - 1) * msi.SegmentsSize) + msi.LastSegmentSize,
Data: msi.Metadata,
}, nil
}
// Store for streams
// Store interface methods for streams to satisfy to be a store
type Store interface {
Meta(ctx context.Context, path paths.Path) (Meta, error)
Get(ctx context.Context, path paths.Path) (ranger.RangeCloser, Meta, error)
@ -37,3 +56,187 @@ type Store interface {
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error)
}
// streamStore is a store for streams
type streamStore struct {
segments segments.Store
segmentSize int64
}
// NewStreamStore stuff
func NewStreamStore(segments segments.Store, segmentSize int64) (Store, error) {
if segmentSize <= 0 {
return nil, errs.New("segment size must be larger than 0")
}
return &streamStore{segments: segments, segmentSize: segmentSize}, nil
}
// Put breaks up data as it comes in into s.segmentSize length pieces, then
// store the first piece at s0/<path>, second piece at s1/<path>, and the
// *last* piece at l/<path>. Store the given metadata, along with the number
// of segments, in a new protobuf, in the metadata of l/<path>.
func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (m Meta, err error) {
defer mon.Task()(&ctx)(&err)
var totalSegments int64
var totalSize int64
var lastSegmentSize int64
awareLimitReader := EOFAwareReader(data)
for !awareLimitReader.isEOF() {
segmentPath := path.Prepend(fmt.Sprintf("s%d", totalSegments))
segmentData := io.LimitReader(awareLimitReader, s.segmentSize)
putMeta, err := s.segments.Put(ctx, segmentPath, segmentData,
nil, expiration)
if err != nil {
return Meta{}, err
}
lastSegmentSize = putMeta.Size
totalSize = totalSize + putMeta.Size
totalSegments = totalSegments + 1
}
lastSegmentPath := path.Prepend("l")
md := streamspb.MetaStreamInfo{
NumberOfSegments: totalSegments,
SegmentsSize: s.segmentSize,
LastSegmentSize: lastSegmentSize,
Metadata: metadata,
}
lastSegmentMetadata, err := proto.Marshal(&md)
if err != nil {
return Meta{}, err
}
putMeta, err := s.segments.Put(ctx, lastSegmentPath, data,
lastSegmentMetadata, expiration)
if err != nil {
return Meta{}, err
}
totalSize = totalSize + putMeta.Size
resultMeta := Meta{
Modified: putMeta.Modified,
Expiration: expiration,
Size: totalSize,
Data: metadata,
}
return resultMeta, nil
}
// Get returns a ranger that knows what the overall size is (from l/<path>)
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
// ..., l/<path>.
func (s *streamStore) Get(ctx context.Context, path paths.Path) (
rr ranger.RangeCloser, meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
lastRangerCloser, lastSegmentMeta, err := s.segments.Get(ctx, path.Prepend("l"))
if err != nil {
return nil, Meta{}, err
}
msi := streamspb.MetaStreamInfo{}
err = proto.Unmarshal(lastSegmentMeta.Data, &msi)
if err != nil {
return nil, Meta{}, err
}
newMeta, err := convertMeta(lastSegmentMeta)
if err != nil {
return nil, Meta{}, err
}
var rangers []ranger.Ranger
for i := 0; i < int(msi.NumberOfSegments); i++ {
currentPath := fmt.Sprintf("s%d", i)
rangeCloser, _, err := s.segments.Get(ctx, path.Prepend(currentPath))
if err != nil {
return nil, Meta{}, err
}
rangers = append(rangers, rangeCloser)
}
rangers = append(rangers, lastRangerCloser)
catRangers := ranger.Concat(rangers...)
return ranger.NopCloser(catRangers), newMeta, nil
}
// Meta implements Store.Meta
func (s *streamStore) Meta(ctx context.Context, path paths.Path) (Meta, error) {
segmentMeta, err := s.segments.Meta(ctx, path.Prepend("l"))
if err != nil {
return Meta{}, err
}
meta, err := convertMeta(segmentMeta)
if err != nil {
return Meta{}, err
}
return meta, nil
}
// Delete all the segments, with the last one last
func (s *streamStore) Delete(ctx context.Context, path paths.Path) (err error) {
defer mon.Task()(&ctx)(&err)
lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l"))
if err != nil {
return err
}
msi := streamspb.MetaStreamInfo{}
err = proto.Unmarshal(lastSegmentMeta.Data, &msi)
if err != nil {
return err
}
for i := 0; i < int(msi.NumberOfSegments); i++ {
currentPath := fmt.Sprintf("s%d", i)
err := s.segments.Delete(ctx, path.Prepend(currentPath))
if err != nil {
return err
}
}
return s.segments.Delete(ctx, path.Prepend("l"))
}
// ListItem is a single item in a listing
type ListItem struct {
Path paths.Path
Meta Meta
}
// List all the paths inside l/, stripping off the l/ prefix
func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path,
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error) {
defer mon.Task()(&ctx)(&err)
lItems, more, err := s.segments.List(ctx, prefix.Prepend("l"), startAfter, endBefore, recursive, limit, metaFlags)
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(lItems))
for i, item := range lItems {
newMeta, err := convertMeta(item.Meta)
if err != nil {
return nil, false, err
}
items[i] = ListItem{Path: item.Path[1:], Meta: newMeta}
}
return items, more, nil
}

6
protos/streams/gen.go Normal file
View File

@ -0,0 +1,6 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package streams
//go:generate protoc --go_out=plugins=grpc:. meta.proto

102
protos/streams/meta.pb.go Normal file
View File

@ -0,0 +1,102 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: meta.proto
package streams
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type MetaStreamInfo struct {
NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments" json:"number_of_segments,omitempty"`
SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize" json:"segments_size,omitempty"`
LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize" json:"last_segment_size,omitempty"`
Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MetaStreamInfo) Reset() { *m = MetaStreamInfo{} }
func (m *MetaStreamInfo) String() string { return proto.CompactTextString(m) }
func (*MetaStreamInfo) ProtoMessage() {}
func (*MetaStreamInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_1e6a51dbfd2db316, []int{0}
}
func (m *MetaStreamInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MetaStreamInfo.Unmarshal(m, b)
}
func (m *MetaStreamInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_MetaStreamInfo.Marshal(b, m, deterministic)
}
func (dst *MetaStreamInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_MetaStreamInfo.Merge(dst, src)
}
func (m *MetaStreamInfo) XXX_Size() int {
return xxx_messageInfo_MetaStreamInfo.Size(m)
}
func (m *MetaStreamInfo) XXX_DiscardUnknown() {
xxx_messageInfo_MetaStreamInfo.DiscardUnknown(m)
}
var xxx_messageInfo_MetaStreamInfo proto.InternalMessageInfo
func (m *MetaStreamInfo) GetNumberOfSegments() int64 {
if m != nil {
return m.NumberOfSegments
}
return 0
}
func (m *MetaStreamInfo) GetSegmentsSize() int64 {
if m != nil {
return m.SegmentsSize
}
return 0
}
func (m *MetaStreamInfo) GetLastSegmentSize() int64 {
if m != nil {
return m.LastSegmentSize
}
return 0
}
func (m *MetaStreamInfo) GetMetadata() []byte {
if m != nil {
return m.Metadata
}
return nil
}
func init() {
proto.RegisterType((*MetaStreamInfo)(nil), "streams.MetaStreamInfo")
}
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_1e6a51dbfd2db316) }
var fileDescriptor_meta_1e6a51dbfd2db316 = []byte{
// 165 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0x4d, 0x2d, 0x49,
0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0x2d, 0x56,
0x5a, 0xcd, 0xc8, 0xc5, 0xe7, 0x9b, 0x5a, 0x92, 0x18, 0x0c, 0xe6, 0x7b, 0xe6, 0xa5, 0xe5, 0x0b,
0xe9, 0x70, 0x09, 0xe5, 0x95, 0xe6, 0x26, 0xa5, 0x16, 0xc5, 0xe7, 0xa7, 0xc5, 0x17, 0xa7, 0xa6,
0xe7, 0xa6, 0xe6, 0x95, 0x14, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x09, 0x40, 0x64, 0xfc,
0xd3, 0x82, 0xa1, 0xe2, 0x42, 0xca, 0x5c, 0xbc, 0x30, 0x35, 0xf1, 0xc5, 0x99, 0x55, 0xa9, 0x12,
0x4c, 0x60, 0x85, 0x3c, 0x30, 0xc1, 0xe0, 0xcc, 0xaa, 0x54, 0x21, 0x2d, 0x2e, 0xc1, 0x9c, 0xc4,
0xe2, 0x12, 0x98, 0x69, 0x10, 0x85, 0xcc, 0x60, 0x85, 0xfc, 0x20, 0x09, 0xa8, 0x69, 0x60, 0xb5,
0x52, 0x5c, 0x1c, 0x20, 0x87, 0xa6, 0x24, 0x96, 0x24, 0x4a, 0xb0, 0x28, 0x30, 0x6a, 0xf0, 0x04,
0xc1, 0xf9, 0x49, 0x6c, 0x60, 0xd7, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a, 0x56, 0xdb,
0x89, 0xcb, 0x00, 0x00, 0x00,
}

13
protos/streams/meta.proto Normal file
View File

@ -0,0 +1,13 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
syntax = "proto3";
package streams;
message MetaStreamInfo {
int64 number_of_segments = 1;
int64 segments_size = 2;
int64 last_segment_size = 3;
bytes metadata = 4;
}