From c8c0a42269aefbfe95f6f50d6ec1b29f7c3ff2ce Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Tue, 2 Jun 2020 11:29:46 -0400 Subject: [PATCH] storagenode/orders: begin implementation of file store for order limits * Will replace order limits database. * This change adds functionality for storing and listing unsent orders. * The next change will add functionality for order archival after submission. Change-Id: Ic5e2abc63991513245b6851a968ff2f2e18ce48d --- storagenode/orders/store.go | 279 +++++++++++++++++++++++++++++++ storagenode/orders/store_test.go | 155 +++++++++++++++++ 2 files changed, 434 insertions(+) create mode 100644 storagenode/orders/store.go create mode 100644 storagenode/orders/store_test.go diff --git a/storagenode/orders/store.go b/storagenode/orders/store.go new file mode 100644 index 000000000..967ecc3d3 --- /dev/null +++ b/storagenode/orders/store.go @@ -0,0 +1,279 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package orders + +import ( + "encoding/binary" + "io" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/zeebo/errs" + + "storj.io/common/pb" + "storj.io/common/storj" +) + +const ( + unsentStagingFileName = "unsent-orders-staging" + unsentReadyFilePrefix = "unsent-orders-ready-" +) + +// FileStore implements the orders.Store interface by appending orders to flat files. +type FileStore struct { + ordersDir string + unsentDir string + archiveDir string + mu sync.Mutex +} + +// NewFileStore creates a new orders file store. +func NewFileStore(ordersDir string) *FileStore { + return &FileStore{ + ordersDir: ordersDir, + unsentDir: filepath.Join(ordersDir, "unsent"), + archiveDir: filepath.Join(ordersDir, "archive"), + } +} + +// Enqueue inserts order to the list of orders needing to be sent to the satellite. +func (store *FileStore) Enqueue(info *Info) (err error) { + store.mu.Lock() + defer store.mu.Unlock() + + f, err := store.getUnsentStagingFile() + if err != nil { + return OrderError.Wrap(err) + } + defer func() { + err = errs.Combine(err, f.Close()) + }() + + err = writeLimit(f, info.Limit) + if err != nil { + return err + } + err = writeOrder(f, info.Order) + if err != nil { + return err + } + return nil +} + +// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite. +// It copies the staging file to a read-only "ready to send" file first. +// It should never be called concurrently with DeleteReadyToSendFiles. +func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Info, err error) { + err = store.convertUnsentStagingToReady() + if err != nil { + return infoMap, err + } + + var errList error + infoMap = make(map[storj.NodeID][]*Info) + + err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + errList = errs.Combine(errList, err) + return nil + } + if info.IsDir() { + return nil + } + if info.Name() == unsentStagingFileName { + return nil + } + + f, err := os.Open(path) + if err != nil { + return OrderError.Wrap(err) + } + defer func() { + err = errs.Combine(err, f.Close()) + }() + + for { + limit, err := readLimit(f) + if err != nil { + if errs.Is(err, io.EOF) { + break + } + return err + } + order, err := readOrder(f) + if err != nil { + return err + } + + newInfo := &Info{ + Limit: limit, + Order: order, + } + infoList := infoMap[limit.SatelliteId] + infoList = append(infoList, newInfo) + infoMap[limit.SatelliteId] = infoList + } + return nil + }) + if err != nil { + errList = errs.Combine(errList, err) + } + + return infoMap, errList +} + +// DeleteReadyToSendFiles deletes all non-staging files in the "unsent" directory. +// It should be called after the order limits have been sent. +// It should never be called concurrently with ListUnsentBySatellite. +func (store *FileStore) DeleteReadyToSendFiles() (err error) { + var errList error + + err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + errList = errs.Combine(errList, OrderError.Wrap(err)) + return nil + } + if info.IsDir() { + return nil + } + if info.Name() == unsentStagingFileName { + return nil + } + // delete all non-staging files + return OrderError.Wrap(os.Remove(path)) + }) + if err != nil { + errList = errs.Combine(errList, err) + } + + return errList +} + +// convertUnsentStagingToReady converts the unsent staging file to be read only, and renames it. +func (store *FileStore) convertUnsentStagingToReady() error { + // lock mutex so no one tries to write to the file while we do this + store.mu.Lock() + defer store.mu.Unlock() + + oldFileName := unsentStagingFileName + oldFilePath := filepath.Join(store.unsentDir, oldFileName) + if _, err := os.Stat(oldFilePath); os.IsNotExist(err) { + return nil + } + + // set file to readonly + err := os.Chmod(oldFilePath, 0444) + if err != nil { + return err + } + // make new file suffix the current time in case there are other "ready" files already + timeStr := strconv.FormatInt(time.Now().UnixNano(), 10) + newFilePath := filepath.Join(store.unsentDir, unsentReadyFilePrefix+timeStr) + // rename file + return os.Rename(oldFilePath, newFilePath) +} + +// getUnsentStagingFile creates or gets the order limit file for appending unsent orders to. +// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file. +func (store *FileStore) getUnsentStagingFile() (*os.File, error) { + if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) { + err = os.Mkdir(store.unsentDir, 0700) + if err != nil { + return nil, OrderError.Wrap(err) + } + } + + fileName := unsentStagingFileName + filePath := filepath.Join(store.unsentDir, fileName) + // create file if not exists or append + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, OrderError.Wrap(err) + } + return f, nil +} + +// writeLimit writes the size of the order limit bytes, followed by the order limit bytes. +// it expects the caller to have locked the mutex. +func writeLimit(f io.Writer, limit *pb.OrderLimit) error { + limitSerialized, err := pb.Marshal(limit) + if err != nil { + return OrderError.Wrap(err) + } + + sizeBytes := [4]byte{} + binary.LittleEndian.PutUint32(sizeBytes[:], uint32(len(limitSerialized))) + if _, err = f.Write(sizeBytes[:]); err != nil { + return OrderError.New("Error writing serialized limit size: %w", err) + } + + if _, err = f.Write(limitSerialized); err != nil { + return OrderError.New("Error writing serialized limit: %w", err) + } + return nil +} + +// readLimit reads the size of the limit followed by the serialized limit, and returns the unmarshalled limit. +func readLimit(f io.Reader) (*pb.OrderLimit, error) { + sizeBytes := [4]byte{} + _, err := io.ReadFull(f, sizeBytes[:]) + if err != nil { + return nil, OrderError.Wrap(err) + } + limitSize := binary.LittleEndian.Uint32(sizeBytes[:]) + limitSerialized := make([]byte, limitSize) + _, err = io.ReadFull(f, limitSerialized) + if err != nil { + return nil, OrderError.Wrap(err) + } + limit := &pb.OrderLimit{} + err = pb.Unmarshal(limitSerialized, limit) + if err != nil { + return nil, OrderError.Wrap(err) + } + return limit, nil +} + +// writeOrder writes the size of the order bytes, followed by the order bytes. +// it expects the caller to have locked the mutex. +func writeOrder(f io.Writer, order *pb.Order) error { + orderSerialized, err := pb.Marshal(order) + if err != nil { + return OrderError.Wrap(err) + } + + sizeBytes := [4]byte{} + binary.LittleEndian.PutUint32(sizeBytes[:], uint32(len(orderSerialized))) + if _, err = f.Write(sizeBytes[:]); err != nil { + return OrderError.New("Error writing serialized order size: %w", err) + } + if _, err = f.Write(orderSerialized); err != nil { + return OrderError.New("Error writing serialized order: %w", err) + } + return nil +} + +// readOrder reads the size of the order followed by the serialized order, and returns the unmarshalled order. +func readOrder(f io.Reader) (*pb.Order, error) { + sizeBytes := [4]byte{} + _, err := io.ReadFull(f, sizeBytes[:]) + if err != nil { + return nil, OrderError.Wrap(err) + } + orderSize := binary.LittleEndian.Uint32(sizeBytes[:]) + orderSerialized := make([]byte, orderSize) + _, err = io.ReadFull(f, orderSerialized) + if err != nil { + return nil, OrderError.Wrap(err) + } + order := &pb.Order{} + err = pb.Unmarshal(orderSerialized, order) + if err != nil { + return nil, OrderError.Wrap(err) + } + return order, nil +} diff --git a/storagenode/orders/store_test.go b/storagenode/orders/store_test.go new file mode 100644 index 000000000..d1c27af3e --- /dev/null +++ b/storagenode/orders/store_test.go @@ -0,0 +1,155 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package orders_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/storagenode/orders" +) + +func TestOrdersStore(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + dirName := ctx.Dir("test-orders") + + ordersStore := orders.NewFileStore(dirName) + + numSatellites := 3 + serialsPerSat := 5 + originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat) + require.NoError(t, err) + + unsentMap, err := ordersStore.ListUnsentBySatellite() + require.NoError(t, err) + + // go through order limits and make sure information is accurate + require.Len(t, unsentMap, numSatellites) + for _, unsentSatList := range unsentMap { + require.Len(t, unsentSatList, serialsPerSat) + + for _, unsentInfo := range unsentSatList { + sn := unsentInfo.Limit.SerialNumber + originalInfo := originalInfos[sn] + + verifyInfosEqual(t, unsentInfo, originalInfo) + } + } + + // add some more orders and list again + // we should see everything we added before, plus the new ones + newInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat) + require.NoError(t, err) + for sn, info := range newInfos { + originalInfos[sn] = info + } + // because we have stored two times, we have twice as many serials... + require.Len(t, originalInfos, 2*numSatellites*serialsPerSat) + + unsentMap, err = ordersStore.ListUnsentBySatellite() + require.NoError(t, err) + // ...and twice as many satellites. + require.Len(t, unsentMap, 2*numSatellites) + for _, unsentSatList := range unsentMap { + require.Len(t, unsentSatList, serialsPerSat) + + for _, unsentInfo := range unsentSatList { + sn := unsentInfo.Limit.SerialNumber + originalInfo := originalInfos[sn] + + verifyInfosEqual(t, unsentInfo, originalInfo) + } + } + + // now, add another order, delete ready to send files, and list + // we should only see the new order + originalInfos, err = storeNewOrders(ordersStore, 1, 1) + require.NoError(t, err) + + err = ordersStore.DeleteReadyToSendFiles() + require.NoError(t, err) + + unsentMap, err = ordersStore.ListUnsentBySatellite() + require.NoError(t, err) + require.Len(t, unsentMap, 1) + for _, unsentSatList := range unsentMap { + require.Len(t, unsentSatList, 1) + for _, unsentInfo := range unsentSatList { + sn := unsentInfo.Limit.SerialNumber + originalInfo := originalInfos[sn] + + verifyInfosEqual(t, unsentInfo, originalInfo) + } + } + + // TODO test order archival +} + +func verifyInfosEqual(t *testing.T, a, b *orders.Info) { + t.Helper() + + require.NotNil(t, a) + require.NotNil(t, b) + + require.Equal(t, a.Limit.SerialNumber, b.Limit.SerialNumber) + require.Equal(t, a.Limit.SatelliteId, b.Limit.SatelliteId) + require.Equal(t, a.Limit.OrderExpiration.UTC(), b.Limit.OrderExpiration.UTC()) + require.Equal(t, a.Limit.Action, b.Limit.Action) + + require.Equal(t, a.Order.Amount, b.Order.Amount) + require.Equal(t, a.Order.SerialNumber, b.Order.SerialNumber) + +} + +func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatellite int) (map[storj.SerialNumber]*orders.Info, error) { + actions := []pb.PieceAction{ + pb.PieceAction_GET, + pb.PieceAction_PUT_REPAIR, + pb.PieceAction_GET_AUDIT, + } + originalInfos := make(map[storj.SerialNumber]*orders.Info) + for i := 0; i < numSatellites; i++ { + satellite := testrand.NodeID() + + // for each satellite, half of the orders will expire in an hour + // and half will expire in three hours. + for j := 0; j < numOrdersPerSatellite; j++ { + expiration := time.Now().Add(time.Hour) + if j < 2 { + expiration = time.Now().Add(3 * time.Hour) + } + amount := testrand.Int63n(1000) + sn := testrand.SerialNumber() + action := actions[j%len(actions)] + + newInfo := &orders.Info{ + Limit: &pb.OrderLimit{ + SerialNumber: sn, + SatelliteId: satellite, + Action: action, + OrderExpiration: expiration, + }, + Order: &pb.Order{ + SerialNumber: sn, + Amount: amount, + }, + } + originalInfos[sn] = newInfo + + // store the new info in the orders store + err := ordersStore.Enqueue(newInfo) + if err != nil { + return originalInfos, err + } + } + } + return originalInfos, nil +}