diff --git a/go.mod b/go.mod index a18206a99..1e9b62851 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( golang.org/x/tools v0.0.0-20200923182640-463111b69878 // indirect google.golang.org/api v0.20.0 // indirect google.golang.org/protobuf v1.25.0 // indirect - storj.io/common v0.0.0-20210202120805-a5a4cfd90efa + storj.io/common v0.0.0-20210203121719-6c48157d3f5f storj.io/drpc v0.0.16 storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b storj.io/private v0.0.0-20210120150301-bd3ac3e989f0 diff --git a/go.sum b/go.sum index dcb0fff9f..a881388e0 100644 --- a/go.sum +++ b/go.sum @@ -934,8 +934,8 @@ storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0 storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ= storj.io/common v0.0.0-20210119231202-8321551aa24d h1:lOLCRtsKISuZlK2lBI5O0uBAc44mp/yO3CtUTXNNSUc= storj.io/common v0.0.0-20210119231202-8321551aa24d/go.mod h1:KhVByBTvjV2rsaUQsft0pKgBRRMvCcY1JsDqt6BWr3I= -storj.io/common v0.0.0-20210202120805-a5a4cfd90efa h1:MkGCzbHxlmbZNmRxxLNnS4RUxKHhNEDFDsqsLChFnq4= -storj.io/common v0.0.0-20210202120805-a5a4cfd90efa/go.mod h1:KhVByBTvjV2rsaUQsft0pKgBRRMvCcY1JsDqt6BWr3I= +storj.io/common v0.0.0-20210203121719-6c48157d3f5f h1:k0XvINvUag6E3v58QmknmvpgQMBPPNlC9OCUC537XcI= +storj.io/common v0.0.0-20210203121719-6c48157d3f5f/go.mod h1:KhVByBTvjV2rsaUQsft0pKgBRRMvCcY1JsDqt6BWr3I= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA= storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ= diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index 147fa9db3..66e30b784 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -175,16 +175,6 @@ func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPI } } -// Settlement receives orders and handles them in batches. -// -// Deprecated: an error is always returned to the client. -func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) { - ctx := stream.Context() - defer mon.Task()(&ctx)(&err) - - return rpcstatus.Error(rpcstatus.Unavailable, "deprecated endpoint") -} - type bucketIDAction struct { bucketname string projectID uuid.UUID diff --git a/satellite/orders/endpoint_test.go b/satellite/orders/endpoint_test.go index c7ac2ad9d..c36637803 100644 --- a/satellite/orders/endpoint_test.go +++ b/satellite/orders/endpoint_test.go @@ -4,14 +4,12 @@ package orders_test import ( - "io" "testing" "time" "github.com/stretchr/testify/require" "storj.io/common/pb" - "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/testcontext" @@ -428,96 +426,3 @@ func TestSettlementWithWindowEndpointErrors(t *testing.T) { } }) } - -func TestSettlementEndpointSingleOrder(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - const dataAmount int64 = 50 - satellite := planet.Satellites[0] - ordersDB := satellite.Orders.DB - storagenode := planet.StorageNodes[0] - now := time.Now() - projectID := testrand.UUID() - bucketname := "testbucket" - bucketLocation := metabase.BucketLocation{ - ProjectID: projectID, - BucketName: bucketname, - } - // stop the async flush because we want to be sure when some values are - // written to avoid races - satellite.Orders.Chore.Loop.Pause() - - // confirm storagenode and bucket bandwidth tables start empty - snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now) - require.NoError(t, err) - require.EqualValues(t, 0, snbw) - - bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) - require.NoError(t, err) - require.EqualValues(t, 0, bucketbw) - - piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() - require.NoError(t, err) - - serialNumber := testrand.SerialNumber() - key := satellite.Config.Orders.EncryptionKeys.Default - encrypted, err := key.EncryptMetadata( - serialNumber, - &internalpb.OrderLimitMetadata{ - CompactProjectBucketPrefix: bucketLocation.CompactPrefix(), - }, - ) - require.NoError(t, err) - - // create signed orderlimit or order to test with - limit := &pb.OrderLimit{ - SerialNumber: serialNumber, - SatelliteId: satellite.ID(), - UplinkPublicKey: piecePublicKey, - StorageNodeId: storagenode.ID(), - PieceId: storj.NewPieceID(), - Action: pb.PieceAction_PUT, - Limit: 1000, - PieceExpiration: time.Time{}, - OrderCreation: now, - OrderExpiration: now.Add(24 * time.Hour), - EncryptedMetadataKeyId: key.ID[:], - EncryptedMetadata: encrypted, - } - orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit) - require.NoError(t, err) - - order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{ - SerialNumber: serialNumber, - Amount: dataAmount, - }) - require.NoError(t, err) - - // create connection between storagenode and satellite - conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()}) - require.NoError(t, err) - defer ctx.Check(conn.Close) - - stream, err := pb.NewDRPCOrdersClient(conn).Settlement(ctx) - require.NoError(t, err) - defer ctx.Check(stream.Close) - - // in phase2 and phase3, the endpoint was disabled. depending on how fast the - // server sends that error message, we may see an io.EOF on the Send call, or - // we may see no error at all. In either case, we have to call stream.Recv to - // see the actual error. gRPC semantics are funky. - err = stream.Send(&pb.SettlementRequest{ - Limit: orderLimit, - Order: order, - }) - if err != io.EOF { - require.NoError(t, err) - } - require.NoError(t, stream.CloseSend()) - - _, err = stream.Recv() - require.Error(t, err) - require.Equal(t, rpcstatus.Unavailable, rpcstatus.Code(err)) - }) -} diff --git a/storagenode/orders/service.go b/storagenode/orders/service.go index a3fbd9f26..820107ed9 100644 --- a/storagenode/orders/service.go +++ b/storagenode/orders/service.go @@ -5,8 +5,6 @@ package orders import ( "context" - "errors" - "io" "math/rand" "sync" "time" @@ -177,212 +175,6 @@ func (service *Service) SendOrders(ctx context.Context, now time.Time) { defer mon.Task()(&ctx)(nil) service.log.Debug("sending") - // If there are orders in the database, send from there. - // Otherwise, send from the filestore. - service.sendOrdersFromDB(ctx) - service.sendOrdersFromFileStore(ctx, now) -} - -func (service *Service) sendOrdersFromDB(ctx context.Context) (hasOrders bool) { - defer mon.Task()(&ctx)(nil) - - const batchSize = 1000 - hasOrders = true - - ordersBySatellite, err := service.orders.ListUnsentBySatellite(ctx) - if err != nil { - if ordersBySatellite == nil { - service.log.Error("listing orders", zap.Error(err)) - hasOrders = false - return hasOrders - } - - service.log.Warn("DB contains invalid marshalled orders", zap.Error(err)) - } - - requests := make(chan ArchiveRequest, batchSize) - var batchGroup errgroup.Group - batchGroup.Go(func() error { return service.handleBatches(ctx, requests) }) - - if len(ordersBySatellite) > 0 { - var group errgroup.Group - ctx, cancel := context.WithTimeout(ctx, service.config.SenderTimeout) - defer cancel() - - for satelliteID, orders := range ordersBySatellite { - satelliteID, orders := satelliteID, orders - group.Go(func() error { - service.Settle(ctx, satelliteID, orders, requests) - return nil - }) - } - - _ = group.Wait() // doesn't return errors - } else { - service.log.Debug("no orders to send") - hasOrders = false - } - - close(requests) - err = batchGroup.Wait() - if err != nil { - service.log.Error("archiving orders", zap.Error(err)) - } - return hasOrders -} - -// Settle uploads orders to the satellite. -// -// DEPRECATED server always return an error if this endpoint is called. -func (service *Service) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*ordersfile.Info, requests chan ArchiveRequest) { - log := service.log.Named(satelliteID.String()) - err := service.settle(ctx, log, satelliteID, orders, requests) - if err != nil { - log.Error("failed to settle orders", zap.Error(err)) - } -} - -func (service *Service) handleBatches(ctx context.Context, requests chan ArchiveRequest) (err error) { - defer mon.Task()(&ctx)(&err) - - // In case anything goes wrong, discard everything from the channel. - defer func() { - for range requests { - } - }() - - buffer := make([]ArchiveRequest, 0, cap(requests)) - - archive := func(ctx context.Context, archivedAt time.Time, requests ...ArchiveRequest) error { - if err := service.orders.Archive(ctx, time.Now().UTC(), buffer...); err != nil { - if !OrderNotFoundError.Has(err) { - return err - } - - service.log.Warn("some unsent order aren't in the DB", zap.Error(err)) - } - - return nil - } - - for request := range requests { - buffer = append(buffer, request) - if len(buffer) < cap(buffer) { - continue - } - - if err := archive(ctx, time.Now().UTC(), buffer...); err != nil { - return err - } - buffer = buffer[:0] - } - - if len(buffer) > 0 { - return archive(ctx, time.Now().UTC(), buffer...) - } - - return nil -} - -func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info, requests chan ArchiveRequest) (err error) { - defer mon.Task()(&ctx)(&err) - - log.Info("sending", zap.Int("count", len(orders))) - defer log.Info("finished") - - nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID) - if err != nil { - return OrderError.New("unable to get satellite address: %w", err) - } - - conn, err := service.dialer.DialNodeURL(ctx, nodeurl) - if err != nil { - return OrderError.New("unable to connect to the satellite: %w", err) - } - defer func() { err = errs.Combine(err, conn.Close()) }() - - stream, err := pb.NewDRPCOrdersClient(conn).Settlement(ctx) - if err != nil { - return OrderError.New("failed to start settlement: %w", err) - } - - var group errgroup.Group - var sendErrors errs.Group - - group.Go(func() error { - for _, order := range orders { - req := pb.SettlementRequest{ - Limit: order.Limit, - Order: order.Order, - } - err := stream.Send(&req) - if err != nil { - err = OrderError.New("sending settlement agreements returned an error: %w", err) - log.Error("rpc client when sending new orders settlements", - zap.Error(err), - zap.Any("request", req), - ) - sendErrors.Add(err) - return nil - } - } - - err := stream.CloseSend() - if err != nil { - err = OrderError.New("CloseSend settlement agreements returned an error: %w", err) - log.Error("rpc client error when closing sender ", zap.Error(err)) - sendErrors.Add(err) - } - - return nil - }) - - var errList errs.Group - for { - response, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - err = OrderError.New("failed to receive settlement response: %w", err) - log.Error("rpc client error when receiving new order settlements", zap.Error(err)) - errList.Add(err) - break - } - - var status Status - switch response.Status { - case pb.SettlementResponse_ACCEPTED: - status = StatusAccepted - case pb.SettlementResponse_REJECTED: - status = StatusRejected - default: - err := OrderError.New("unexpected settlement status response: %d", response.Status) - log.Error("rpc client received an unexpected new orders settlement status", - zap.Error(err), zap.Any("response", response), - ) - errList.Add(err) - continue - } - - requests <- ArchiveRequest{ - Satellite: satelliteID, - Serial: response.SerialNumber, - Status: status, - } - } - - // errors of this group are reported to sendErrors and it always return nil - _ = group.Wait() - errList.Add(sendErrors...) - - return errList.Err() -} - -func (service *Service) sendOrdersFromFileStore(ctx context.Context, now time.Time) { - defer mon.Task()(&ctx)(nil) - errorSatellites := make(map[storj.NodeID]struct{}) var errorSatellitesMu sync.Mutex