Merge 'master' branch

Change-Id: Ica5c25607a951076dd9f77e35e308062f71ce3f0
This commit is contained in:
Michal Niewrzal 2020-12-07 13:33:26 +01:00
commit 218bbeaffa
125 changed files with 652 additions and 1184 deletions

View File

@ -27,6 +27,7 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-admin",
APIKeysLRUOptions: runCfg.APIKeysLRUOptions(),
})
if err != nil {

View File

@ -32,6 +32,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-api",
APIKeysLRUOptions: runCfg.APIKeysLRUOptions(),
RevocationLRUOptions: runCfg.RevocationLRUOptions(),
})
@ -42,7 +43,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL)
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL, "satellite-api")
if err != nil {
return errs.New("Error creating metainfodb connection on satellite api: %+v", err)
}

View File

@ -24,7 +24,7 @@ import (
func runBillingCmd(ctx context.Context, cmdFunc func(context.Context, *stripecoinpayments.Service, *dbx.DB) error) error {
// Open SatelliteDB for the Payment Service
logger := zap.L()
db, err := satellitedb.Open(ctx, logger.Named("db"), runCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, logger.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-billing"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -32,7 +32,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents,
}
db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
@ -141,7 +141,7 @@ func recordPeriod(ctx context.Context, paystubsCSV, paymentsCSV string) (int, in
return 0, 0, err
}
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil {
return 0, 0, errs.New("error connecting to master database on satellite: %+v", err)
}
@ -165,7 +165,7 @@ func recordOneOffPayments(ctx context.Context, paymentsCSV string) (int, error)
return 0, err
}
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil {
return 0, errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -28,7 +28,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Failed to load identity: %+v", err)
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-gc"})
if err != nil {
return errs.New("Error starting master database on satellite GC: %+v", err)
}
@ -36,7 +36,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-gc")
if err != nil {
return errs.New("Error creating pointerDB connection GC: %+v", err)
}

View File

@ -27,7 +27,7 @@ import (
// generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period.
func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), gracefulExitCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), gracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -326,6 +326,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-core",
ReportedRollupsReadBatchSize: runCfg.Orders.SettlementBatchSize,
SaveRollupBatchSize: runCfg.Tally.SaveRollupBatchSize,
ReadRollupBatchSize: runCfg.Tally.ReadRollupBatchSize,
@ -337,7 +338,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-core")
if err != nil {
return errs.New("Error creating metainfodb connection: %+v", err)
}
@ -414,7 +415,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
db, err := satellitedb.Open(ctx, log.Named("migration"), runCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log.Named("migration"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-migration"})
if err != nil {
return errs.New("Error creating new master database connection for satellitedb migration: %+v", err)
}
@ -427,7 +428,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}
pdb, err := metainfo.OpenStore(ctx, log.Named("migration"), runCfg.Metainfo.DatabaseURL)
pdb, err := metainfo.OpenStore(ctx, log.Named("migration"), runCfg.Metainfo.DatabaseURL, "satellite-migration")
if err != nil {
return errs.New("Error creating pointer database connection on satellite: %+v", err)
}
@ -477,7 +478,7 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
// open the master db
database, err := satellitedb.Open(ctx, zap.L().Named("db"), qdiagCfg.Database, satellitedb.Options{})
database, err := satellitedb.Open(ctx, zap.L().Named("db"), qdiagCfg.Database, satellitedb.Options{ApplicationName: "satellite-qdiag"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -31,7 +31,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Failed to load identity: %+v", err)
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-repairer"})
if err != nil {
return errs.New("Error starting master database: %+v", err)
}
@ -39,7 +39,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-repairer")
if err != nil {
return errs.New("Error creating metainfo database connection: %+v", err)
}

View File

@ -32,7 +32,7 @@ var headers = []string{
// GenerateAttributionCSV creates a report with.
func GenerateAttributionCSV(ctx context.Context, database string, partnerID uuid.UUID, start time.Time, end time.Time, output io.Writer) error {
log := zap.L().Named("db")
db, err := satellitedb.Open(ctx, log, database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log, database, satellitedb.Options{ApplicationName: "satellite-attribution"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -21,7 +21,7 @@ import (
// generateNodeUsageCSV creates a report with node usage data for all nodes in a given period which can be used for payments.
func generateNodeUsageCSV(ctx context.Context, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), nodeUsageCfg.Database, satellitedb.Options{})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), nodeUsageCfg.Database, satellitedb.Options{ApplicationName: "satellite-nodeusage"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}

View File

@ -49,7 +49,7 @@ func cmdDelete(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), deleteCfg.DatabaseURL)
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), deleteCfg.DatabaseURL, "satellite-reaper")
if err != nil {
return errs.New("error connecting database: %+v", err)
}

View File

@ -49,7 +49,7 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on segment reaper", zap.Error(err))
}
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), detectCfg.DatabaseURL)
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), detectCfg.DatabaseURL, "satellite-reaper")
if err != nil {
return errs.New("error connecting database: %+v", err)
}

View File

@ -355,6 +355,8 @@ func newNetwork(flags *Flags) (*Processes, error) {
apiProcess.Arguments["setup"] = append(apiProcess.Arguments["setup"],
"--database", masterDBURL,
"--metainfo.database-url", metainfoDBURL,
"--orders.include-encrypted-metadata=true",
"--orders.encryption-keys", "0100000000000000=0100000000000000000000000000000000000000000000000000000000000000",
)
}
apiProcess.ExecBefore["run"] = func(process *Process) error {
@ -392,6 +394,8 @@ func newNetwork(flags *Flags) (*Processes, error) {
coreProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"run": {
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
"--orders.include-encrypted-metadata=true",
"--orders.encryption-keys", "0100000000000000=0100000000000000000000000000000000000000000000000000000000000000",
},
})
coreProcess.WaitForExited(migrationProcess)
@ -419,6 +423,8 @@ func newNetwork(flags *Flags) (*Processes, error) {
"run": {
"repair",
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugRepairerHTTP)),
"--orders.include-encrypted-metadata=true",
"--orders.encryption-keys", "0100000000000000=0100000000000000000000000000000000000000000000000000000000000000",
},
})
repairProcess.WaitForExited(migrationProcess)

View File

@ -6,15 +6,17 @@ package cmd
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"github.com/btcsuite/btcutil/base58"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"storj.io/common/fpath"
"storj.io/common/pb"
"storj.io/private/cfgstruct"
"storj.io/private/process"
@ -24,6 +26,7 @@ import (
type registerConfig struct {
AuthService string `help:"the address to the service you wish to register your access with" default:"" basic-help:"true"`
Public bool `help:"if the access should be public" default:"false" basic-help:"true"`
AWSProfile string `help:"update AWS credentials file, appending the credentials using this profile name" default:"" basic-help:"true"`
AccessConfig
}
@ -128,7 +131,7 @@ func accessInspect(cmd *cobra.Command, args []string) (err error) {
func parseAccess(access string) (sa string, apiKey string, ea string, err error) {
data, version, err := base58.CheckDecode(access)
if err != nil || version != 0 {
return "", "", "", errors.New("invalid access grant format")
return "", "", "", errs.New("invalid access grant format: %w", err)
}
p := new(pb.Scope)
@ -138,7 +141,7 @@ func parseAccess(access string) (sa string, apiKey string, ea string, err error)
eaData, err := pb.Marshal(p.EncryptionAccess)
if err != nil {
return "", "", "", errs.New("unable to marshal encryption access: %v", err)
return "", "", "", errs.New("unable to marshal encryption access: %w", err)
}
apiKey = base58.CheckEncode(p.ApiKey, 0)
@ -148,7 +151,7 @@ func parseAccess(access string) (sa string, apiKey string, ea string, err error)
func registerAccess(cmd *cobra.Command, args []string) (err error) {
if len(args) == 0 {
return fmt.Errorf("no access specified")
return errs.New("no access specified")
}
if registerCfg.AuthService == "" {
@ -162,7 +165,7 @@ func registerAccess(cmd *cobra.Command, args []string) (err error) {
if err == nil && access != nil {
accessRaw, err = access.Serialize()
if err != nil {
return fmt.Errorf("error serializing named access '%s'", accessRaw)
return errs.New("error serializing named access '%s': %w", accessRaw, err)
}
}
@ -203,5 +206,52 @@ func registerAccess(cmd *cobra.Command, args []string) (err error) {
fmt.Println("Secret Key: ", secretKey)
fmt.Println("Endpoint: ", respBody["endpoint"])
// update AWS credential file if requested
if registerCfg.AWSProfile != "" {
credentialsPath, err := getAwsCredentialsPath()
if err != nil {
return err
}
err = writeAWSCredentials(credentialsPath, registerCfg.AWSProfile, accessKey, secretKey)
if err != nil {
return err
}
}
return nil
}
// getAwsCredentialsPath returns the expected AWS credentials path.
func getAwsCredentialsPath() (string, error) {
if credentialsPath, found := os.LookupEnv("AWS_SHARED_CREDENTIALS_FILE"); found {
return credentialsPath, nil
}
homeDir, err := os.UserHomeDir()
if err != nil {
return "", errs.Wrap(err)
}
return filepath.Join(homeDir, ".aws", "credentials"), nil
}
// writeAWSCredentials appends to credentialsPath using an AWS compliant credential formatting.
func writeAWSCredentials(credentialsPath, profileName, accessKey, secretKey string) error {
oldCredentials, err := ioutil.ReadFile(credentialsPath)
if err != nil && !os.IsNotExist(err) {
return errs.Wrap(err)
}
const format = "\n[%s]\naws_access_key_id = %s\naws_secret_access_key = %s\n"
newCredentials := fmt.Sprintf(format, profileName, accessKey, secretKey)
var fileMode os.FileMode
fileInfo, err := os.Stat(credentialsPath)
if err == nil {
fileMode = fileInfo.Mode()
} else {
fileMode = 0644
}
err = fpath.AtomicWriteFile(credentialsPath, append(oldCredentials, newCredentials...), fileMode)
if err != nil {
return errs.Wrap(err)
}
fmt.Printf("Updated AWS credential file %s with profile '%s'\n", credentialsPath, profileName)
return nil
}

2
go.mod
View File

@ -42,7 +42,7 @@ require (
golang.org/x/sys v0.0.0-20200929083018-4d22bbb62b3c
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.20.0 // indirect
storj.io/common v0.0.0-20201124202331-31c1d1dc486d
storj.io/common v0.0.0-20201204143755-a03c37168cb1
storj.io/drpc v0.0.16
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
storj.io/private v0.0.0-20201026143115-bc926bfa3bca

10
go.sum
View File

@ -82,7 +82,6 @@ github.com/calebcase/tmpfile v1.0.2 h1:1AGuhKiUu4J6wxz6lxuF6ck3f8G2kaV6KSEny0RGC
github.com/calebcase/tmpfile v1.0.2/go.mod h1:iErLeG/iqJr8LaQ/gYRv4GXdqssi3jg4iSzvrA06/lw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/cheggaaa/pb/v3 v3.0.5 h1:lmZOti7CraK9RSjzExsY53+WWfub9Qv13B5m4ptEoPE=
github.com/cheggaaa/pb/v3 v3.0.5/go.mod h1:X1L61/+36nz9bjIsrDU52qHKOQukUQe2Ge+YvGuquCw=
@ -370,7 +369,6 @@ github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lucas-clemente/quic-go v0.7.1-0.20201102053916-272229abf044 h1:M6zB4Rs4SJDk9IBIvC3ozl23+b0d1Q7NOlHnbxuc3AY=
github.com/lucas-clemente/quic-go v0.7.1-0.20201102053916-272229abf044/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@ -378,9 +376,7 @@ github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzR
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/qpack v0.2.1/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc=
github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc=
github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs=
github.com/marten-seemann/qtls-go1-15 v0.1.1 h1:LIH6K34bPVttyXnUWixk0bzH6/N07VxbSabxn5A5gZQ=
github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
@ -699,6 +695,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
@ -905,11 +902,10 @@ sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
storj.io/common v0.0.0-20201124202331-31c1d1dc486d h1:QTXYMePGSEAtNbweZifHkMQstrRHkviGaKtueOWPmOU=
storj.io/common v0.0.0-20201124202331-31c1d1dc486d/go.mod h1:ocAfQaE1dpflrdTr8hXRZTWP1bq2jXz7ieGSBVCmHEc=
storj.io/common v0.0.0-20201204143755-a03c37168cb1 h1:SwSIESeyaX3kOhZN1jeNPbegSraFTdxtWD+Dn0dT7y4=
storj.io/common v0.0.0-20201204143755-a03c37168cb1/go.mod h1:6sepaQTRLuygvA+GNPzdgRPOB1+wFfjde76KBWofbMY=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14 h1:GCBdymTt1BRw4oHmmUZZlxYXLVRxxYj6x3Ivide2J+I=
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ=
storj.io/drpc v0.0.16/go.mod h1:zdmQ93nx4Z35u11pQ+GAnBy4DGOK3HJCSOfeh2RryTo=

View File

@ -1,11 +1,18 @@
storj.io/storj/cmd/segment-reaper."zombie_segments" IntVal
storj.io/storj/satellite/accounting/tally."total.bytes" IntVal
storj.io/storj/satellite/accounting/tally."total.inline_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total.inline_segments" IntVal
storj.io/storj/satellite/accounting/tally."total.objects" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_segments" IntVal
storj.io/storj/satellite/accounting/tally."total.segments" IntVal
storj.io/storj/satellite/accounting/tally."bucket_bytes" IntVal
storj.io/storj/satellite/accounting/tally."bucket_inline_bytes" IntVal
storj.io/storj/satellite/accounting/tally."bucket_inline_segments" IntVal
storj.io/storj/satellite/accounting/tally."bucket_objects" IntVal
storj.io/storj/satellite/accounting/tally."bucket_remote_bytes" IntVal
storj.io/storj/satellite/accounting/tally."bucket_remote_segments" IntVal
storj.io/storj/satellite/accounting/tally."bucket_segments" IntVal
storj.io/storj/satellite/accounting/tally."total_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total_inline_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total_inline_segments" IntVal
storj.io/storj/satellite/accounting/tally."total_objects" IntVal
storj.io/storj/satellite/accounting/tally."total_remote_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total_remote_segments" IntVal
storj.io/storj/satellite/accounting/tally."total_segments" IntVal
storj.io/storj/satellite/audit."audit_contained_nodes" IntVal
storj.io/storj/satellite/audit."audit_contained_nodes_global" Meter
storj.io/storj/satellite/audit."audit_contained_percentage" FloatVal
@ -39,6 +46,8 @@ storj.io/storj/satellite/audit."reverify_total_in_segment" IntVal
storj.io/storj/satellite/audit."reverify_unknown" IntVal
storj.io/storj/satellite/audit."reverify_unknown_global" Meter
storj.io/storj/satellite/audit."verify_shares_downloaded_successfully" IntVal
storj.io/storj/satellite/contact."failed_dial" Event
storj.io/storj/satellite/contact."failed_ping_node" Event
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_max_failures_percentage" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_validation" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_final_bytes_transferred" IntVal

View File

@ -52,7 +52,10 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.D
return nil, Error.New("unsupported driver %q", driver)
}
source = pgutil.CheckApplicationName(source)
source, err = pgutil.CheckApplicationName(source, "storagenode-multinode")
if err != nil {
return nil, err
}
dbxDB, err := dbx.Open(driver, source)
if err != nil {

View File

@ -17,7 +17,7 @@ var initialized = false
const padding = 2
// Point is a 2D coordinate in console
// Point is a 2D coordinate in console.
// X is the column
// Y is the row
type Point struct{ X, Y int }

View File

@ -92,17 +92,21 @@ func QuerySnapshot(ctx context.Context, db dbschema.Queryer) (*dbschema.Snapshot
}
// CheckApplicationName ensures that the Connection String contains an application name.
func CheckApplicationName(s string) (r string) {
func CheckApplicationName(s string, app string) (string, error) {
if !strings.Contains(s, "application_name") {
if !strings.Contains(s, "?") {
r = s + "?application_name=Satellite"
return
if strings.TrimSpace(app) == "" {
return s, errs.New("application name cannot be empty")
}
r = s + "&application_name=Satellite"
return
if !strings.Contains(s, "?") {
return s + "?application_name=" + app, nil
}
return s + "&application_name=" + app, nil
}
// return source as is if application_name is set
return s
return s, nil
}
// IsConstraintError checks if given error is about constraint violation.

View File

@ -44,7 +44,6 @@ import (
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
@ -185,12 +184,6 @@ type Satellite struct {
Metrics struct {
Chore *metrics.Chore
}
DowntimeTracking struct {
DetectionChore *downtime.DetectionChore
EstimationChore *downtime.EstimationChore
Service *downtime.Service
}
}
// Label returns name for debugger.
@ -405,7 +398,13 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, redis)
encryptionKeys, err := orders.NewEncryptionKeys(orders.EncryptionKey{
ID: orders.EncryptionKeyID{1},
Key: storj.Key{1},
})
if err != nil {
return nil, err
}
config := satellite.Config{
Server: server.Config{
@ -509,6 +508,8 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
FlushInterval: defaultInterval,
NodeStatusLogging: true,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase3,
IncludeEncryptedMetadata: true,
EncryptionKeys: *encryptionKeys,
},
Checker: checker.Config{
Interval: defaultInterval,
@ -626,12 +627,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
Downtime: downtime.Config{
DetectionInterval: defaultInterval,
EstimationInterval: defaultInterval,
EstimationBatchSize: 5,
EstimationConcurrencyLimit: 5,
},
}
if planet.ReferralManager != nil {
@ -769,10 +764,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Metrics.Chore = peer.Metrics.Chore
system.DowntimeTracking.DetectionChore = peer.DowntimeTracking.DetectionChore
system.DowntimeTracking.EstimationChore = peer.DowntimeTracking.EstimationChore
system.DowntimeTracking.Service = peer.DowntimeTracking.Service
return system
}

View File

@ -22,7 +22,7 @@ type Config struct {
CheckInterval time.Duration `help:"Interval to check the version" default:"0h15m0s"`
}
// Service contains the information and variables to ensure the Software is up to date
// Service contains the information and variables to ensure the Software is up to date.
//
// architecture: Service
type Service struct {

View File

@ -170,13 +170,15 @@ func TestBilling_TrafficAfterFileDeletion(t *testing.T) {
uplink = planet.Uplinks[0]
projectID = uplink.Projects[0].ID
)
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
require.NoError(t, err)
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satelliteSys.Orders.Chore.Loop.Pause()
data := testrand.Bytes(5 * memory.KiB)
err := uplink.Upload(ctx, satelliteSys, bucketName, filePath, data)
err = uplink.Upload(ctx, satelliteSys, bucketName, filePath, data)
require.NoError(t, err)
_, err = uplink.Download(ctx, satelliteSys, bucketName, filePath)

View File

@ -138,7 +138,7 @@ type BucketUsageRollup struct {
Before time.Time
}
// StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes
// StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes.
//
// architecture: Database
type StoragenodeAccounting interface {
@ -164,7 +164,7 @@ type StoragenodeAccounting interface {
DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error
}
// ProjectAccounting stores information about bandwidth and storage usage for projects
// ProjectAccounting stores information about bandwidth and storage usage for projects.
//
// architecture: Database
type ProjectAccounting interface {

View File

@ -21,7 +21,7 @@ type Config struct {
DeleteTallies bool `help:"option for deleting tallies after they are rolled up" default:"true"`
}
// Service is the rollup service for totalling data on storage nodes on daily intervals
// Service is the rollup service for totalling data on storage nodes on daily intervals.
//
// architecture: Chore
type Service struct {

View File

@ -32,7 +32,7 @@ type Config struct {
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
}
// Service is the tally service for data stored on each storage node
// Service is the tally service for data stored on each storage node.
//
// architecture: Chore
type Service struct {
@ -185,26 +185,25 @@ func (service *Service) Tally(ctx context.Context) (err error) {
if len(observer.Bucket) > 0 {
var total accounting.BucketTally
for _, bucket := range observer.Bucket {
monAccounting.IntVal("bucket.objects").Observe(bucket.ObjectCount)
monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked
monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked
monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked
monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked
monAccounting.IntVal("bucket.segments").Observe(bucket.Segments())
monAccounting.IntVal("bucket.inline_segments").Observe(bucket.InlineSegments)
monAccounting.IntVal("bucket.remote_segments").Observe(bucket.RemoteSegments)
monAccounting.IntVal("bucket.bytes").Observe(bucket.Bytes())
monAccounting.IntVal("bucket.inline_bytes").Observe(bucket.InlineBytes)
monAccounting.IntVal("bucket.remote_bytes").Observe(bucket.RemoteBytes)
monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked
monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked
monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked
total.Combine(bucket)
}
monAccounting.IntVal("total.objects").Observe(total.ObjectCount) //mon:locked
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
monAccounting.IntVal("total.segments").Observe(total.Segments()) //mon:locked
monAccounting.IntVal("total.inline_segments").Observe(total.InlineSegments) //mon:locked
monAccounting.IntVal("total.remote_segments").Observe(total.RemoteSegments) //mon:locked
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
monAccounting.IntVal("total_inline_segments").Observe(total.InlineSegments) //mon:locked
monAccounting.IntVal("total_remote_segments").Observe(total.RemoteSegments) //mon:locked
monAccounting.IntVal("total.bytes").Observe(total.Bytes()) //mon:locked
monAccounting.IntVal("total.inline_bytes").Observe(total.InlineBytes) //mon:locked
monAccounting.IntVal("total.remote_bytes").Observe(total.RemoteBytes) //mon:locked
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
monAccounting.IntVal("total_inline_bytes").Observe(total.InlineBytes) //mon:locked
monAccounting.IntVal("total_remote_bytes").Observe(total.RemoteBytes) //mon:locked
}
// return errors if something went wrong.

View File

@ -24,7 +24,7 @@ import (
"storj.io/storj/satellite/payments/stripecoinpayments"
)
// Admin is the satellite core process that runs chores
// Admin is the satellite core process that runs chores.
//
// architecture: Peer
type Admin struct {

View File

@ -54,7 +54,7 @@ import (
"storj.io/storj/satellite/snopayout"
)
// API is the satellite API process
// API is the satellite API process.
//
// architecture: Peer
type API struct {
@ -327,17 +327,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),
satelliteSignee,
peer.Orders.DB,
peer.DB.NodeAPIVersion(),
config.Orders.SettlementBatchSize,
config.Orders.WindowEndpointRolloutPhase,
config.Orders.OrdersSemaphoreSize,
)
var err error
peer.Orders.Service, err = orders.NewService(
peer.Log.Named("orders:service"),
@ -354,6 +343,19 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
peer.Orders.Endpoint = orders.NewEndpoint(
peer.Log.Named("orders:endpoint"),
satelliteSignee,
peer.Orders.DB,
peer.DB.NodeAPIVersion(),
config.Orders.SettlementBatchSize,
config.Orders.WindowEndpointRolloutPhase,
config.Orders.OrdersSemaphoreSize,
peer.Orders.Service,
)
if err := pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())
}

View File

@ -34,7 +34,7 @@ type CSVRow struct {
EgressData int64
}
// DB implements the database for value attribution table
// DB implements the database for value attribution table.
//
// architecture: Database
type DB interface {

View File

@ -34,7 +34,7 @@ type PendingAudit struct {
Path storj.Path
}
// Containment holds information about pending audits for contained nodes
// Containment holds information about pending audits for contained nodes.
//
// architecture: Database
type Containment interface {

View File

@ -25,9 +25,9 @@ import (
)
// TestDisqualificationTooManyFailedAudits does the following:
// * Create a failed audit report for a storagenode
// * Record the audit report several times and check that the node isn't
// disqualified until the audit reputation reaches the cut-off value.
// - Create a failed audit report for a storagenode
// - Record the audit report several times and check that the node isn't
// disqualified until the audit reputation reaches the cut-off value.
func TestDisqualificationTooManyFailedAudits(t *testing.T) {
var (
auditDQCutOff = 0.4

View File

@ -13,7 +13,7 @@ import (
var _ metainfo.Observer = (*PathCollector)(nil)
// PathCollector uses the metainfo loop to add paths to node reservoirs
// PathCollector uses the metainfo loop to add paths to node reservoirs.
//
// architecture: Observer
type PathCollector struct {

View File

@ -13,7 +13,7 @@ import (
"storj.io/storj/satellite/overlay"
)
// Reporter records audit reports in overlay and implements the reporter interface
// Reporter records audit reports in overlay and implements the reporter interface.
//
// architecture: Service
type Reporter struct {

View File

@ -50,7 +50,7 @@ type Share struct {
Data []byte
}
// Verifier helps verify the correctness of a given stripe
// Verifier helps verify the correctness of a given stripe.
//
// architecture: Worker
type Verifier struct {

View File

@ -10,7 +10,7 @@ import (
"storj.io/common/uuid"
)
// APIKeys is interface for working with api keys store
// APIKeys is interface for working with api keys store.
//
// architecture: Database
type APIKeys interface {

View File

@ -32,8 +32,18 @@ func Test_AllBucketNames(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
project := planet.Uplinks[0].Projects[0]
service := sat.API.Console.Service
newUser := console.CreateUser{
FullName: "Jack-bucket",
ShortName: "",
Email: "bucketest@test.test",
}
user, err := sat.AddUser(ctx, newUser, 1)
require.NoError(t, err)
project, err := sat.AddProject(ctx, user.ID, "buckettest")
require.NoError(t, err)
bucket1 := storj.Bucket{
ID: testrand.UUID(),
@ -47,33 +57,14 @@ func Test_AllBucketNames(t *testing.T) {
ProjectID: project.ID,
}
_, err := sat.DB.Buckets().CreateBucket(ctx, bucket1)
_, err = sat.DB.Buckets().CreateBucket(ctx, bucket1)
require.NoError(t, err)
_, err = sat.DB.Buckets().CreateBucket(ctx, bucket2)
require.NoError(t, err)
user := console.CreateUser{
FullName: "Jack",
ShortName: "",
Email: "bucketest@test.test",
Password: "123a123",
}
refUserID := ""
regToken, err := service.CreateRegToken(ctx, 1)
require.NoError(t, err)
createdUser, err := service.CreateUser(ctx, user, regToken.Secret, refUserID)
require.NoError(t, err)
activationToken, err := service.GenerateActivationToken(ctx, createdUser.ID, createdUser.Email)
require.NoError(t, err)
err = service.ActivateAccount(ctx, activationToken)
require.NoError(t, err)
token, err := service.Token(ctx, user.Email, user.Password)
// we are using full name as a password
token, err := sat.API.Console.Service.Token(ctx, user.Email, user.FullName)
require.NoError(t, err)
client := http.Client{}

View File

@ -87,7 +87,7 @@ type Config struct {
console.Config
}
// Server represents console web server
// Server represents console web server.
//
// architecture: Endpoint
type Server struct {

View File

@ -15,7 +15,7 @@ import (
"storj.io/common/uuid"
)
// RegistrationTokens is interface for working with registration tokens
// RegistrationTokens is interface for working with registration tokens.
//
// architecture: Database
type RegistrationTokens interface {

View File

@ -14,7 +14,7 @@ import (
"storj.io/common/uuid"
)
// ResetPasswordTokens is interface for working with reset password tokens
// ResetPasswordTokens is interface for working with reset password tokens.
//
// architecture: Database
type ResetPasswordTokens interface {

View File

@ -76,7 +76,7 @@ var (
ErrEmailUsed = errs.Class("email used")
)
// Service is handling accounts related logic
// Service is handling accounts related logic.
//
// architecture: Service
type Service struct {
@ -1460,7 +1460,12 @@ func (s *Service) GetBucketTotals(ctx context.Context, projectID uuid.UUID, curs
func (s *Service) GetAllBucketNames(ctx context.Context, projectID uuid.UUID) (_ []string, err error) {
defer mon.Task()(&ctx)(&err)
_, err = s.getAuthAndAuditLog(ctx, "get all bucket names", zap.String("projectID", projectID.String()))
auth, err := s.getAuthAndAuditLog(ctx, "get all bucket names", zap.String("projectID", projectID.String()))
if err != nil {
return nil, Error.Wrap(err)
}
_, err = s.isProjectMember(ctx, auth.User.ID, projectID)
if err != nil {
return nil, Error.Wrap(err)
}

View File

@ -160,16 +160,21 @@ func TestService(t *testing.T) {
ProjectID: up2Pro1.ID,
}
_, err := sat.DB.Buckets().CreateBucket(authCtx1, bucket1)
_, err := sat.DB.Buckets().CreateBucket(authCtx2, bucket1)
require.NoError(t, err)
_, err = sat.DB.Buckets().CreateBucket(authCtx1, bucket2)
_, err = sat.DB.Buckets().CreateBucket(authCtx2, bucket2)
require.NoError(t, err)
bucketNames, err := service.GetAllBucketNames(authCtx1, up2Pro1.ID)
bucketNames, err := service.GetAllBucketNames(authCtx2, up2Pro1.ID)
require.NoError(t, err)
require.Equal(t, bucket1.Name, bucketNames[0])
require.Equal(t, bucket2.Name, bucketNames[1])
// Getting someone else buckets should not work
bucketsForUnauthorizedUser, err := service.GetAllBucketNames(authCtx1, up2Pro1.ID)
require.Error(t, err)
require.Nil(t, bucketsForUnauthorizedUser)
})
})
}

View File

@ -17,7 +17,7 @@ import (
// NoCreditForUpdateErr is a error message used when no credits are found for update when new users sign up.
var NoCreditForUpdateErr = errs.Class("no credit found to update")
// UserCredits holds information to interact with database
// UserCredits holds information to interact with database.
//
// architecture: Database
type UserCredits interface {

View File

@ -83,7 +83,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
// If there is an error from trying to dial and ping the node, return that error as
// pingErrorMessage and not as the err. We want to use this info to update
// node contact info and do not want to terminate execution by returning an err
mon.Event("failed dial")
mon.Event("failed_dial") //mon:locked
pingNodeSuccess = false
pingErrorMessage = fmt.Sprintf("failed to dial storage node (ID: %s) at address %s: %q",
nodeurl.ID, nodeurl.Address, err,
@ -97,7 +97,7 @@ func (service *Service) PingBack(ctx context.Context, nodeurl storj.NodeURL) (_
_, err = client.pingNode(ctx, &pb.ContactPingRequest{})
if err != nil {
mon.Event("failed ping node")
mon.Event("failed_ping_node") //mon:locked
pingNodeSuccess = false
pingErrorMessage = fmt.Sprintf("failed to ping storage node, your node indicated error code: %d, %q", rpcstatus.Code(err), err)
service.log.Debug("pingBack pingNode error",

View File

@ -32,7 +32,6 @@ import (
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo"
@ -45,7 +44,7 @@ import (
"storj.io/storj/satellite/repair/checker"
)
// Core is the satellite core process that runs chores
// Core is the satellite core process that runs chores.
//
// architecture: Peer
type Core struct {
@ -138,12 +137,6 @@ type Core struct {
Metrics struct {
Chore *metrics.Chore
}
DowntimeTracking struct {
DetectionChore *downtime.DetectionChore
EstimationChore *downtime.EstimationChore
Service *downtime.Service
}
}
// New creates a new satellite.
@ -532,39 +525,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
debug.Cycle("Metrics", peer.Metrics.Chore.Loop))
}
{ // setup downtime tracking
peer.DowntimeTracking.Service = downtime.NewService(peer.Log.Named("downtime"), peer.Overlay.Service, peer.Contact.Service)
peer.DowntimeTracking.DetectionChore = downtime.NewDetectionChore(
peer.Log.Named("downtime:detection"),
config.Downtime,
peer.Overlay.Service,
peer.DowntimeTracking.Service,
)
peer.Services.Add(lifecycle.Item{
Name: "downtime:detection",
Run: peer.DowntimeTracking.DetectionChore.Run,
Close: peer.DowntimeTracking.DetectionChore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Downtime Detection", peer.DowntimeTracking.DetectionChore.Loop))
peer.DowntimeTracking.EstimationChore = downtime.NewEstimationChore(
peer.Log.Named("downtime:estimation"),
config.Downtime,
peer.Overlay.Service,
peer.DowntimeTracking.Service,
peer.DB.DowntimeTracking(),
)
peer.Services.Add(lifecycle.Item{
Name: "downtime:estimation",
Run: peer.DowntimeTracking.EstimationChore.Run,
Close: peer.DowntimeTracking.EstimationChore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Downtime Estimation", peer.DowntimeTracking.EstimationChore.Loop))
}
return peer, nil
}

View File

@ -1,22 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"time"
"github.com/spacemonkeygo/monkit/v3"
)
var (
mon = monkit.Package()
)
// Config for the chore.
type Config struct {
DetectionInterval time.Duration `help:"how often to run the downtime detection chore." releaseDefault:"1h0s" devDefault:"30s"`
EstimationInterval time.Duration `help:"how often to run the downtime estimation chore" releaseDefault:"1h0s" devDefault:"30s"`
EstimationBatchSize int `help:"the downtime estimation chore should check this many offline nodes" releaseDefault:"1000" devDefault:"100"`
EstimationConcurrencyLimit int `help:"max number of concurrent connections in estimation chore" default:"10"`
}

View File

@ -1,31 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"context"
"time"
"storj.io/common/storj"
)
// NodeOfflineTime represents a record in the nodes_offline_time table.
type NodeOfflineTime struct {
NodeID storj.NodeID
TrackedAt time.Time
TimeOffline time.Duration
}
// DB implements basic operations for downtime tracking service
//
// architecture: Database
type DB interface {
// Add adds a record for a particular node ID with the amount of time it has been offline.
Add(ctx context.Context, nodeID storj.NodeID, trackedTime time.Time, timeOffline time.Duration) error
// GetOfflineTime gets the total amount of offline time for a node within a certain timeframe.
// "total offline time" is defined as the sum of all offline time windows that begin inside the provided time window.
// An offline time window that began before `begin` but that overlaps with the provided time window is not included.
// An offline time window that begins within the provided time window, but that extends beyond `end` is included.
GetOfflineTime(ctx context.Context, nodeID storj.NodeID, begin, end time.Time) (time.Duration, error)
}

View File

@ -1,68 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestDowntime(t *testing.T) {
// test basic downtime functionality
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
downtimeDB := db.DowntimeTracking()
now := time.Now()
oneYearAgo := time.Now().Add(-time.Hour * 24 * 365)
// node1 was offline for one hour recently, and offline for two hours a year ago
nodeID1 := testrand.NodeID()
err := downtimeDB.Add(ctx, nodeID1, now, time.Hour)
require.NoError(t, err)
err = downtimeDB.Add(ctx, nodeID1, oneYearAgo, 2*time.Hour)
require.NoError(t, err)
// node2 was offline for two hours a year ago
nodeID2 := testrand.NodeID()
err = downtimeDB.Add(ctx, nodeID2, oneYearAgo, 2*time.Hour)
require.NoError(t, err)
// if we only check recent history, node1 offline time should be 1 hour
duration, err := downtimeDB.GetOfflineTime(ctx, nodeID1, now.Add(-time.Hour), now.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, time.Hour)
// if we only check recent history, node2 should not be offline at all
duration, err = downtimeDB.GetOfflineTime(ctx, nodeID2, now.Add(-time.Hour), now.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, time.Duration(0))
// if we only check old history, node1 offline time should be 2 hours
duration, err = downtimeDB.GetOfflineTime(ctx, nodeID1, oneYearAgo.Add(-time.Hour), oneYearAgo.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, 2*time.Hour)
// if we only check old history, node2 offline time should be 2 hours
duration, err = downtimeDB.GetOfflineTime(ctx, nodeID2, oneYearAgo.Add(-time.Hour), oneYearAgo.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, 2*time.Hour)
// if we check all history (from before oneYearAgo to after now), node1 should be offline for 3 hours
duration, err = downtimeDB.GetOfflineTime(ctx, nodeID1, oneYearAgo.Add(-time.Hour), now.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, 3*time.Hour)
// if we check all history (from before oneYearAgo to after now), node2 should be offline for 2 hours
duration, err = downtimeDB.GetOfflineTime(ctx, nodeID2, oneYearAgo.Add(-time.Hour), now.Add(time.Hour))
require.NoError(t, err)
require.Equal(t, duration, 2*time.Hour)
})
}

View File

@ -1,73 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"context"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
)
// DetectionChore looks for nodes that have not checked in and tries to contact them.
//
// architecture: Chore
type DetectionChore struct {
log *zap.Logger
Loop *sync2.Cycle
config Config
overlay *overlay.Service
service *Service
}
// NewDetectionChore instantiates DetectionChore.
func NewDetectionChore(log *zap.Logger, config Config, overlay *overlay.Service, service *Service) *DetectionChore {
return &DetectionChore{
log: log,
Loop: sync2.NewCycle(config.DetectionInterval),
config: config,
overlay: overlay,
service: service,
}
}
// Run starts the chore.
func (chore *DetectionChore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Debug("checking for nodes that have not had a successful check-in within the interval.",
zap.Stringer("interval", chore.config.DetectionInterval))
nodeLastContacts, err := chore.overlay.GetSuccesfulNodesNotCheckedInSince(ctx, chore.config.DetectionInterval)
if err != nil {
chore.log.Error("error retrieving node addresses for downtime detection.", zap.Error(err))
return nil
}
chore.log.Debug("nodes that have had not had a successful check-in with the interval.",
zap.Stringer("interval", chore.config.DetectionInterval),
zap.Int("count", len(nodeLastContacts)))
for _, nodeLastContact := range nodeLastContacts {
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, nodeLastContact.URL)
if err != nil {
chore.log.Error("error during downtime detection ping back.",
zap.Bool("success", success),
zap.Error(err))
continue
}
}
return nil
})
}
// Close closes chore.
func (chore *DetectionChore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -1,95 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/overlay"
)
func TestDetectionChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
satellite := planet.Satellites[0]
node.Contact.Chore.Pause(ctx)
satellite.DowntimeTracking.DetectionChore.Loop.Pause()
// setup
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
info := overlay.NodeCheckInInfo{
NodeID: nodeInfo.ID,
IsUp: true,
Address: &pb.NodeAddress{
Address: nodeInfo.Address,
},
Operator: &nodeInfo.Operator,
Version: &nodeInfo.Version,
}
sixtyOneMinutes := 61 * time.Minute
{ // test node ping back success
// check-in 1 hours, 1 minute ago for that node
oldCheckinTime := time.Now().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, overlay.NodeSelectionConfig{})
require.NoError(t, err)
// get successful nodes that haven't checked in with the hour. should return 1
nodeLastContacts, err := satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 1)
require.WithinDuration(t, oldCheckinTime, nodeLastContacts[0].LastContactSuccess, time.Second)
// run detection chore
satellite.DowntimeTracking.DetectionChore.Loop.TriggerWait()
// node should not be in "offline" list or "successful, not checked in" list
nodeLastContacts, err = satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 0)
nodesOffline, err := satellite.DB.OverlayCache().GetOfflineNodesLimited(ctx, 10)
require.NoError(t, err)
require.Len(t, nodesOffline, 0)
}
{ // test node ping back failure
// check-in 1 hour, 1 minute ago for that node - again
oldCheckinTime := time.Now().Add(-sixtyOneMinutes)
err := satellite.DB.OverlayCache().UpdateCheckIn(ctx, info, oldCheckinTime, overlay.NodeSelectionConfig{})
require.NoError(t, err)
// close the node service so the ping back will fail
err = node.Server.Close()
require.NoError(t, err)
// get successful nodes that haven't checked in with the hour. should return 1 - again
nodeLastContacts, err := satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 1)
require.WithinDuration(t, oldCheckinTime, nodeLastContacts[0].LastContactSuccess, time.Second)
// run detection chore - again
satellite.DowntimeTracking.DetectionChore.Loop.TriggerWait()
// node should be in "offline" list but not in "successful, not checked in" list
nodeLastContacts, err = satellite.DB.OverlayCache().GetSuccesfulNodesNotCheckedInSince(ctx, time.Hour)
require.NoError(t, err)
require.Len(t, nodeLastContacts, 0)
nodesOffline, err := satellite.DB.OverlayCache().GetOfflineNodesLimited(ctx, 10)
require.NoError(t, err)
require.Len(t, nodesOffline, 1)
}
})
}

View File

@ -1,95 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
)
// EstimationChore estimates how long nodes have been offline.
//
// architecture: Chore
type EstimationChore struct {
log *zap.Logger
Loop *sync2.Cycle
limiter *sync2.Limiter
config Config
startTime time.Time
overlay *overlay.Service
service *Service
db DB
}
// NewEstimationChore instantiates EstimationChore.
func NewEstimationChore(log *zap.Logger, config Config, overlay *overlay.Service, service *Service, db DB) *EstimationChore {
if config.EstimationConcurrencyLimit <= 0 {
config.EstimationConcurrencyLimit = 1
}
return &EstimationChore{
log: log,
Loop: sync2.NewCycle(config.EstimationInterval),
limiter: sync2.NewLimiter(config.EstimationConcurrencyLimit),
config: config,
startTime: time.Now().UTC(),
overlay: overlay,
service: service,
db: db,
}
}
// Run starts the chore.
func (chore *EstimationChore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.log.Debug("checking uptime of failed nodes",
zap.Stringer("interval", chore.config.EstimationInterval))
offlineNodes, err := chore.overlay.GetOfflineNodesLimited(ctx, chore.config.EstimationBatchSize)
if err != nil {
chore.log.Error("error getting offline nodes", zap.Error(err))
return nil
}
for _, node := range offlineNodes {
node := node
chore.limiter.Go(ctx, func() {
success, err := chore.service.CheckAndUpdateNodeAvailability(ctx, node.URL)
if err != nil {
chore.log.Error("error during downtime estimation ping back",
zap.Bool("success", success),
zap.Error(err))
return
}
if !success && node.LastContactFailure.After(chore.startTime) {
now := time.Now().UTC()
duration := now.Sub(node.LastContactFailure)
err = chore.db.Add(ctx, node.URL.ID, now, duration)
if err != nil {
chore.log.Error("error adding node seconds offline information.",
zap.Stringer("node ID", node.URL.ID),
zap.Stringer("duration", duration),
zap.Error(err))
}
}
})
}
chore.limiter.Wait()
return nil
})
}
// Close closes chore.
func (chore *EstimationChore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -1,162 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/overlay"
)
// TestEstimationChoreBasic tests the basic functionality of the downtime estimation chore:
//
// 1. Test that when a node that had one failed ping, and one successful ping >1s later does not have recorded downtime.
// 2. Test that when a node that had one failed ping, and another failed ping >1s later has at least 1s of recorded downtime.
func TestEstimationChoreBasic(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Downtime.EstimationBatchSize = 2
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
satellite.DowntimeTracking.EstimationChore.Loop.Pause()
{ // test last_contact_success is updated for nodes where last_contact_failure > last_contact_success, but node is online
var oldNodes []*overlay.NodeDossier
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)
// mark node as failing an uptime check so the estimation chore picks it up
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
oldNodes = append(oldNodes, oldNode)
}
// run estimation chore
time.Sleep(1 * time.Second) // wait for 1s because estimation chore truncates offline duration to seconds
satellite.DowntimeTracking.EstimationChore.Loop.TriggerWait()
for i, node := range planet.StorageNodes {
// get offline time for node, expect it to be 0 since node was online when chore pinged it
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
require.NoError(t, err)
require.True(t, downtime == 0)
// expect node last contact success was updated
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.Equal(t, oldNodes[i].Reputation.LastContactFailure, newNode.Reputation.LastContactFailure)
require.True(t, oldNodes[i].Reputation.LastContactSuccess.Before(newNode.Reputation.LastContactSuccess))
require.True(t, newNode.Reputation.LastContactFailure.Before(newNode.Reputation.LastContactSuccess))
}
}
{ // test last_contact_failure is updated and downtime is recorded for nodes where last_contact_failure > last_contact_success and node is offline
var oldNodes []*overlay.NodeDossier
for _, node := range planet.StorageNodes {
// mark node as failing an uptime check so the estimation chore picks it up
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
// close the node service so the ping back will fail
err = node.Server.Close()
require.NoError(t, err)
oldNodes = append(oldNodes, oldNode)
}
// run estimation chore
time.Sleep(1 * time.Second) // wait for 1s because estimation chore truncates offline duration to seconds
satellite.DowntimeTracking.EstimationChore.Loop.TriggerWait()
for i, node := range planet.StorageNodes {
// get offline time for node, expect it to be greater than 0 since node has been offline for at least 1s
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
require.NoError(t, err)
require.True(t, downtime > 0)
// expect node last contact failure was updated
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.Equal(t, oldNodes[i].Reputation.LastContactSuccess, newNode.Reputation.LastContactSuccess)
require.True(t, oldNodes[i].Reputation.LastContactFailure.Before(newNode.Reputation.LastContactFailure))
}
}
})
}
// TestEstimationChoreSatelliteDowntime tests the situation where downtime is estimated when the satellite was started after the last failed ping
// If a storage node has a failed ping, then another ping fails later, the estimation chore will normally take the difference between these pings and record that as the downtime.
// If the satellite was started between the old failed ping and the new failed ping, we do not want to risk including satellite downtime in our calculation - no downtime should be recorded in this case.
func TestEstimationChoreSatelliteDowntime(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Downtime.EstimationBatchSize = 1
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
satellite := planet.Satellites[0]
node.Contact.Chore.Pause(ctx)
satellite.DowntimeTracking.EstimationChore.Loop.Pause()
// mark node as failing an uptime check so the estimation chore picks it up
_, err := satellite.DB.OverlayCache().UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
oldNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, oldNode.Reputation.LastContactSuccess.Before(oldNode.Reputation.LastContactFailure))
// close the node service so the ping back will fail
err = node.Server.Close()
require.NoError(t, err)
// create new estimation chore that starts after the node's last contacted time
newEstimationChore := downtime.NewEstimationChore(
satellite.Log,
downtime.Config{
EstimationInterval: 1 * time.Second,
EstimationBatchSize: 10,
EstimationConcurrencyLimit: 10,
},
satellite.Overlay.Service,
satellite.DowntimeTracking.Service,
satellite.DB.DowntimeTracking(),
)
time.Sleep(1 * time.Second) // wait for 1s because estimation chore truncates offline duration to seconds
var group errgroup.Group
group.Go(func() error {
return newEstimationChore.Run(ctx)
})
defer func() {
err = newEstimationChore.Close()
require.NoError(t, err)
err = group.Wait()
require.NoError(t, err)
}()
newEstimationChore.Loop.TriggerWait()
// since the estimation chore was started after the last ping, the node's offline time should be 0
downtime, err := satellite.DB.DowntimeTracking().GetOfflineTime(ctx, node.ID(), time.Now().Add(-5*time.Hour), time.Now())
require.NoError(t, err)
require.EqualValues(t, downtime, 0)
// expect node last contact failure was updated
newNode, err := satellite.DB.OverlayCache().Get(ctx, node.ID())
require.NoError(t, err)
require.Equal(t, oldNode.Reputation.LastContactSuccess, newNode.Reputation.LastContactSuccess)
require.True(t, oldNode.Reputation.LastContactFailure.Before(newNode.Reputation.LastContactFailure))
})
}

View File

@ -1,74 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/overlay"
)
// Service is a service for downtime checking.
//
// architecture: Service
type Service struct {
log *zap.Logger
overlay *overlay.Service
contact *contact.Service
}
// NewService creates a new downtime tracking service.
func NewService(log *zap.Logger, overlay *overlay.Service, contact *contact.Service) *Service {
return &Service{
log: log,
overlay: overlay,
contact: contact,
}
}
// CheckAndUpdateNodeAvailability tries to ping the supplied address and updates the uptime based on ping success or failure. Returns true if the ping and uptime updates are successful.
func (service *Service) CheckAndUpdateNodeAvailability(ctx context.Context, nodeurl storj.NodeURL) (success bool, err error) {
defer mon.Task()(&ctx)(&err)
pingNodeSuccess, pingErrorMessage, err := service.contact.PingBack(ctx, nodeurl)
if err != nil {
service.log.Error("error during downtime detection ping back.",
zap.String("ping error", pingErrorMessage),
zap.Error(err))
return false, errs.Wrap(err)
}
if pingNodeSuccess {
_, err = service.overlay.UpdateUptime(ctx, nodeurl.ID, true)
if err != nil {
service.log.Error("error updating node contact success information.",
zap.Stringer("node ID", nodeurl.ID),
zap.Error(err))
return false, errs.Wrap(err)
}
return true, nil
}
_, err = service.overlay.UpdateUptime(ctx, nodeurl.ID, false)
if err != nil {
service.log.Error("error updating node contact failure information.",
zap.Stringer("node ID", nodeurl.ID),
zap.Error(err))
return false, errs.Wrap(err)
}
return false, nil
}
// Close closes resources.
func (service *Service) Close() error { return nil }

View File

@ -1,62 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package downtime_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
)
func TestCheckNodeAvailability(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
satellite := planet.Satellites[0]
node.Contact.Chore.Pause(ctx)
satellite.Audit.Chore.Loop.Pause()
// test that last success and failure checks are before now
beforeSuccessfulCheck := time.Now()
dossier, err := satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, dossier.Reputation.LastContactSuccess.Before(beforeSuccessfulCheck))
require.True(t, dossier.Reputation.LastContactFailure.Before(beforeSuccessfulCheck))
success, err := satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, node.NodeURL())
require.NoError(t, err)
require.True(t, success)
lastFailure := dossier.Reputation.LastContactFailure
// now test that CheckAndUpdateNodeAvailability updated with a success, and the last contact failure is the same.
dossier, err = satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, dossier.Reputation.LastContactSuccess.After(beforeSuccessfulCheck))
require.True(t, dossier.Reputation.LastContactFailure.Equal(lastFailure))
lastSuccess := dossier.Reputation.LastContactSuccess
// shutdown the node
err = node.Server.Close()
require.NoError(t, err)
// now test that CheckAndUpdateNodeAvailability updated with a failure, and the last contact success is the same
beforeFailedCheck := time.Now()
success, err = satellite.DowntimeTracking.Service.CheckAndUpdateNodeAvailability(ctx, node.NodeURL())
require.NoError(t, err)
require.False(t, success)
dossier, err = satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
require.True(t, dossier.Reputation.LastContactFailure.After(beforeFailedCheck))
require.True(t, dossier.Reputation.LastContactSuccess.Equal(lastSuccess))
})
}

View File

@ -28,7 +28,7 @@ import (
"storj.io/storj/satellite/overlay"
)
// GarbageCollection is the satellite garbage collection process
// GarbageCollection is the satellite garbage collection process.
//
// architecture: Peer
type GarbageCollection struct {

View File

@ -17,7 +17,7 @@ import (
var _ metainfo.Observer = (*PieceTracker)(nil)
// PieceTracker implements the metainfo loop observer interface for garbage collection
// PieceTracker implements the metainfo loop observer interface for garbage collection.
//
// architecture: Observer
type PieceTracker struct {

View File

@ -33,6 +33,7 @@ type Config struct {
Enabled bool `help:"set if garbage collection is enabled or not" releaseDefault:"true" devDefault:"true"`
SkipFirst bool `help:"if true, skip the first run of GC" releaseDefault:"true" devDefault:"false"`
RunInCore bool `help:"if true, run garbage collection as part of the core" releaseDefault:"false" devDefault:"false"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
@ -40,7 +41,7 @@ type Config struct {
RetainSendTimeout time.Duration `help:"the amount of time to allow a node to handle a retain request" default:"1m"`
}
// Service implements the garbage collection service
// Service implements the garbage collection service.
//
// architecture: Chore
type Service struct {

View File

@ -36,7 +36,7 @@ type TransferQueueItem struct {
OrderLimitSendCount int
}
// DB implements CRUD operations for graceful exit service
// DB implements CRUD operations for graceful exit service.
//
// architecture: Database
type DB interface {

View File

@ -17,7 +17,7 @@ import (
var _ metainfo.Observer = (*PathCollector)(nil)
// PathCollector uses the metainfo loop to add paths to node reservoirs
// PathCollector uses the metainfo loop to add paths to node reservoirs.
//
// architecture: Observer
type PathCollector struct {

View File

@ -27,7 +27,7 @@ var (
const lastSegmentIndex = int64(-1)
// Endpoint for checking object and segment health
// Endpoint for checking object and segment health.
//
// architecture: Endpoint
type Endpoint struct {

View File

@ -34,7 +34,7 @@ var (
mon = monkit.Package()
)
// Sender sends emails
// Sender sends emails.
//
// architecture: Service
type Sender interface {
@ -48,7 +48,7 @@ type Message interface {
Subject() string
}
// Service sends template-backed email messages through SMTP
// Service sends template-backed email messages through SMTP.
//
// architecture: Service
type Service struct {

View File

@ -20,8 +20,7 @@ var mon = monkit.Package()
var _ mailservice.Sender = (*LinkClicker)(nil)
// LinkClicker is mailservice.Sender that click all links
// from html msg parts
// LinkClicker is mailservice.Sender that click all links from html msg parts.
//
// architecture: Service
type LinkClicker struct{}

View File

@ -31,7 +31,7 @@ type Config struct {
StaticDir string `help:"path to static resources" default:""`
}
// Server represents marketing offersweb server
// Server represents marketing offersweb server.
//
// architecture: Endpoint
type Server struct {

View File

@ -141,7 +141,7 @@ type PointerDB interface {
}
// OpenStore returns database for storing pointer data.
func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string) (db PointerDB, err error) {
func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string, app string) (db PointerDB, err error) {
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
if err != nil {
return nil, err
@ -149,9 +149,9 @@ func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string) (db
switch implementation {
case dbutil.Postgres:
db, err = postgreskv.Open(ctx, source)
db, err = postgreskv.Open(ctx, source, app)
case dbutil.Cockroach:
db, err = cockroachkv.Open(ctx, source)
db, err = cockroachkv.Open(ctx, source, app)
default:
err = Error.New("unsupported db implementation: %s", dbURLString)
}

View File

@ -12,7 +12,7 @@ import (
"storj.io/storj/satellite/metainfo/metabase"
)
// BucketsDB is the interface for the database to interact with buckets
// BucketsDB is the interface for the database to interact with buckets.
//
// architecture: Database
type BucketsDB interface {

View File

@ -27,7 +27,7 @@ type Config struct {
Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"`
}
// Chore implements the expired segment cleanup chore
// Chore implements the expired segment cleanup chore.
//
// architecture: Chore
type Chore struct {

View File

@ -33,7 +33,7 @@ import (
// * upload 2 inline files
// * connect two observers to the metainfo loop
// * run the metainfo loop
// * expect that each observer has seen
// * expect that each observer has seen:
// - 5 remote files
// - 5 remote segments
// - 2 inline files/segments

View File

@ -193,7 +193,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return Error.New("unable to query segments: %w", err)
}
err = nil //nolint:ineffassign ignore any other err result (explicitly)
err = nil //nolint: ineffassign, ignore any other err result (explicitly)
return nil
})

View File

@ -24,7 +24,7 @@ var (
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// Service structure
// Service provides the metainfo service dependencies.
//
// architecture: Service
type Service struct {

View File

@ -21,7 +21,7 @@ var (
mon = monkit.Package()
)
// Endpoint for querying node stats for the SNO
// Endpoint for querying node stats for the SNO.
//
// architecture: Endpoint
type Endpoint struct {

View File

@ -26,7 +26,7 @@ import (
"storj.io/storj/satellite/nodeapiversion"
)
// DB implements saving order after receiving from storage node
// DB implements saving order after receiving from storage node.
//
// architecture: Database
type DB interface {
@ -200,7 +200,7 @@ type ProcessOrderResponse struct {
Status pb.SettlementResponse_Status
}
// Endpoint for orders receiving
// Endpoint for orders receiving.
//
// architecture: Endpoint
type Endpoint struct {
@ -211,13 +211,17 @@ type Endpoint struct {
settlementBatchSize int
windowEndpointRolloutPhase WindowEndpointRolloutPhase
ordersSemaphore chan struct{}
ordersService *Service
}
// NewEndpoint new orders receiving endpoint.
//
// ordersSemaphoreSize controls the number of concurrent clients allowed to submit orders at once.
// A value of zero means unlimited.
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase, ordersSemaphoreSize int) *Endpoint {
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB,
settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase,
ordersSemaphoreSize int, ordersService *Service) *Endpoint {
var ordersSemaphore chan struct{}
if ordersSemaphoreSize > 0 {
ordersSemaphore = make(chan struct{}, ordersSemaphoreSize)
@ -231,6 +235,7 @@ func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPI
settlementBatchSize: settlementBatchSize,
windowEndpointRolloutPhase: windowEndpointRolloutPhase,
ordersSemaphore: ordersSemaphore,
ordersService: ordersService,
}
}
@ -642,22 +647,56 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
storagenodeSettled[int32(orderLimit.Action)] += order.Amount
bucketPrefix, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum)
if err != nil {
log.Info("get bucketPrefix from serial number table err", zap.Error(err))
continue
var bucketName string
var projectID uuid.UUID
if len(orderLimit.EncryptedMetadata) > 0 {
metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit)
if err != nil {
log.Info("decrypt order metadata err:", zap.Error(err))
mon.Event("bucketinfo_from_orders_metadata_error_1")
goto idFromSerialTable
}
bucketInfo, err := metabase.ParseBucketPrefix(
metabase.BucketPrefix(metadata.GetProjectBucketPrefix()),
)
if err != nil {
log.Info("decrypt order: ParseBucketPrefix", zap.Error(err))
mon.Event("bucketinfo_from_orders_metadata_error_2")
goto idFromSerialTable
}
bucketName = bucketInfo.BucketName
projectID = bucketInfo.ProjectID
mon.Event("bucketinfo_from_orders_metadata")
}
bucket, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
if err != nil {
log.Info("split bucket err", zap.Error(err), zap.String("bucketPrefix", string(bucketPrefix)))
continue
// If we cannot get the bucket name and project ID from the orderLimit metadata, then fallback
// to the old method of getting it from the serial_numbers table.
// This is only temporary to make sure the orderLimit metadata is working correctly.
idFromSerialTable:
if bucketName == "" || projectID.IsZero() {
bucketPrefix, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum)
if err != nil {
log.Info("get bucketPrefix from serial number table err", zap.Error(err))
continue
}
bucket, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
if err != nil {
log.Info("split bucket err", zap.Error(err), zap.String("bucketPrefix", string(bucketPrefix)))
continue
}
bucketName = bucket.BucketName
projectID = bucket.ProjectID
mon.Event("bucketinfo_from_serial_number")
}
bucketSettled[bucketIDAction{
bucketname: bucket.BucketName,
projectID: bucket.ProjectID,
bucketname: bucketName,
projectID: projectID,
action: orderLimit.Action,
}] += order.Amount
}
if len(storagenodeSettled) == 0 {
log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount))
status = pb.SettlementWithWindowResponse_REJECTED
@ -701,7 +740,9 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
})
}
func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order, orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool {
func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order,
orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool {
if orderLimit.StorageNodeId != peerID {
log.Debug("storage node id mismatch")
mon.Event("order_not_valid_storagenodeid")

View File

@ -22,8 +22,12 @@ import (
"storj.io/uplink/private/eestream"
)
// ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces.
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
var (
// ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces.
ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
// ErrDecryptOrderMetadata is returned when a step of decrypting metadata fails.
ErrDecryptOrderMetadata = errs.Class("decrytping order metadata")
)
// Config is a configuration struct for orders Service.
type Config struct {
@ -674,3 +678,24 @@ func (service *Service) UpdatePutInlineOrder(ctx context.Context, bucket metabas
return service.orders.UpdateBucketBandwidthInline(ctx, bucket.ProjectID, []byte(bucket.BucketName), pb.PieceAction_PUT, amount, intervalStart)
}
// DecryptOrderMetadata decrypts the order metadata.
func (service *Service) DecryptOrderMetadata(ctx context.Context, order *pb.OrderLimit) (_ *pb.OrderLimitMetadata, err error) {
defer mon.Task()(&ctx)(&err)
var orderKeyID EncryptionKeyID
copy(orderKeyID[:], order.EncryptedMetadataKeyId)
var key = service.encryptionKeys.Default
if key.ID != orderKeyID {
val, ok := service.encryptionKeys.KeyByID[orderKeyID]
if !ok {
return nil, ErrDecryptOrderMetadata.New("no encryption key found that matches the order.EncryptedMetadataKeyId")
}
key = EncryptionKey{
ID: orderKeyID,
Key: val,
}
}
return key.DecryptMetadata(order.SerialNumber, order.EncryptedMetadata)
}

View File

@ -0,0 +1,70 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metainfo/metabase"
)
func TestOrderLimitsEncryptedMetadata(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const (
bucketName = "testbucket"
filePath = "test/path"
)
var (
satellitePeer = planet.Satellites[0]
uplinkPeer = planet.Uplinks[0]
projectID = uplinkPeer.Projects[0].ID
)
// Setup: Upload an object and create order limits
require.NoError(t, uplinkPeer.Upload(ctx, satellitePeer, bucketName, filePath, testrand.Bytes(5*memory.KiB)))
bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName}
items, _, err := satellitePeer.Metainfo.Service.List(ctx, metabase.SegmentKey{}, "", true, 10, ^uint32(0))
require.NoError(t, err)
require.Equal(t, 1, len(items))
pointer, err := satellitePeer.Metainfo.Service.Get(ctx, metabase.SegmentKey(items[0].Path))
require.NoError(t, err)
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, pointer)
require.NoError(t, err)
require.Equal(t, 2, len(limits))
// Test: get the bucket name and project ID from the encrypted metadata and
// compare with the old method of getting the data from the serial numbers table.
orderLimit1 := limits[0].Limit
require.True(t, len(orderLimit1.EncryptedMetadata) > 0)
_, err = metabase.ParseBucketPrefix(metabase.BucketPrefix(""))
require.Error(t, err)
var x []byte
_, err = metabase.ParseBucketPrefix(metabase.BucketPrefix(x))
require.Error(t, err)
actualOrderMetadata, err := satellitePeer.Orders.Service.DecryptOrderMetadata(ctx, orderLimit1)
require.NoError(t, err)
actualBucketInfo, err := metabase.ParseBucketPrefix(
metabase.BucketPrefix(actualOrderMetadata.GetProjectBucketPrefix()),
)
require.NoError(t, err)
require.Equal(t, bucketName, actualBucketInfo.BucketName)
require.Equal(t, projectID, actualBucketInfo.ProjectID)
bucketPrefix, err := satellitePeer.Orders.DB.GetBucketIDFromSerialNumber(ctx, orderLimit1.SerialNumber)
require.NoError(t, err)
bucket1, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
require.NoError(t, err)
require.Equal(t, actualBucketInfo.BucketName, bucket1.BucketName)
require.Equal(t, actualBucketInfo.ProjectID, bucket1.ProjectID)
})
}

View File

@ -146,15 +146,10 @@ func (signer *Signer) Sign(ctx context.Context, node storj.NodeURL, pieceNum int
return nil, ErrSigner.New("default encryption key is missing")
}
bucketID, err := signer.Service.buckets.GetBucketID(ctx, signer.Bucket)
if err != nil {
return nil, ErrSigner.Wrap(err)
}
encrypted, err := encryptionKey.EncryptMetadata(
signer.Serial,
&pb.OrderLimitMetadata{
BucketId: bucketID[:],
ProjectBucketPrefix: []byte(signer.Bucket.Prefix()),
},
)
if err != nil {

View File

@ -42,10 +42,10 @@ func TestSigner_EncryptedMetadata(t *testing.T) {
project, err := uplink.GetProject(ctx, satellite)
require.NoError(t, err)
bucketName := "testbucket"
bucketLocation := metabase.BucketLocation{
ProjectID: uplink.Projects[0].ID,
BucketName: "testbucket",
BucketName: bucketName,
}
_, err = project.EnsureBucket(ctx, bucketLocation.BucketName)
@ -71,10 +71,10 @@ func TestSigner_EncryptedMetadata(t *testing.T) {
metadata, err := ekeys.Default.DecryptMetadata(addressedLimit.Limit.SerialNumber, addressedLimit.Limit.EncryptedMetadata)
require.NoError(t, err)
bucketID, err := satellite.DB.Buckets().GetBucketID(ctx, bucketLocation)
bucketInfo, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(metadata.ProjectBucketPrefix))
require.NoError(t, err)
require.Equal(t, bucketID[:], metadata.BucketId)
require.Equal(t, bucketInfo.BucketName, bucketName)
require.Equal(t, bucketInfo.ProjectID, uplink.Projects[0].ID)
})
}

View File

@ -11,7 +11,7 @@ import (
"storj.io/storj/satellite/internalpb"
)
// Inspector is a RPC service for inspecting overlay internals
// Inspector is a RPC service for inspecting overlay internals.
//
// architecture: Endpoint
type Inspector struct {

View File

@ -15,7 +15,7 @@ import (
"storj.io/storj/satellite/nodeselection"
)
// CacheDB implements the database for overlay node selection cache
// CacheDB implements the database for overlay node selection cache.
//
// architecture: Database
type CacheDB interface {

View File

@ -10,7 +10,7 @@ import (
"storj.io/common/storj"
)
// PeerIdentities stores storagenode peer identities
// PeerIdentities stores storagenode peer identities.
//
// architecture: Database
type PeerIdentities interface {

View File

@ -163,8 +163,6 @@ func TestEnsureMinimumRequested(t *testing.T) {
satellite.Audit.Chore.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
satellite.DowntimeTracking.DetectionChore.Loop.Pause()
satellite.DowntimeTracking.EstimationChore.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)
}

View File

@ -36,7 +36,7 @@ var ErrNodeFinishedGE = errs.Class("node finished graceful exit")
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters.
var ErrNotEnoughNodes = errs.Class("not enough nodes")
// DB implements the database for overlay.Service
// DB implements the database for overlay.Service.
//
// architecture: Database
type DB interface {
@ -265,7 +265,7 @@ func (node *SelectedNode) Clone() *SelectedNode {
}
}
// Service is used to store and handle node information
// Service is used to store and handle node information.
//
// architecture: Service
type Service struct {

View File

@ -404,8 +404,6 @@ func TestGetOnlineNodesForGetDelete(t *testing.T) {
planet.Satellites[0].Audit.Chore.Loop.Pause()
planet.Satellites[0].Repair.Checker.Loop.Pause()
planet.Satellites[0].Repair.Repairer.Loop.Pause()
planet.Satellites[0].DowntimeTracking.DetectionChore.Loop.Pause()
planet.Satellites[0].DowntimeTracking.EstimationChore.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)
}

View File

@ -12,7 +12,6 @@ import (
"storj.io/common/identity"
"storj.io/private/debug"
"storj.io/storj/pkg/server"
"storj.io/storj/private/migrate"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
@ -28,7 +27,6 @@ import (
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
@ -48,7 +46,6 @@ import (
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
"storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/satellite/snopayout"
)
@ -58,7 +55,7 @@ func init() {
hw.Register(monkit.Default)
}
// DB is the master database for the satellite
// DB is the master database for the satellite.
//
// architecture: Master Database
type DB interface {
@ -71,12 +68,6 @@ type DB interface {
// TestingMigrateToLatest initializes the database for testplanet.
TestingMigrateToLatest(ctx context.Context) error
// MigrationTestingDefaultDB assists in testing migrations themselves
// against the default database.
MigrationTestingDefaultDB() interface {
TestDBAccess() *dbx.DB
PostgresMigration() *migrate.Migration
}
// PeerIdentities returns a storage for peer identities
PeerIdentities() overlay.PeerIdentities
@ -106,8 +97,6 @@ type DB interface {
GracefulExit() gracefulexit.DB
// StripeCoinPayments returns stripecoinpayments database.
StripeCoinPayments() stripecoinpayments.DB
// DowntimeTracking returns database for downtime tracking
DowntimeTracking() downtime.DB
// SnoPayout returns database for payout.
SnoPayout() snopayout.DB
// Compoensation tracks storage node compensation
@ -164,8 +153,6 @@ type Config struct {
Metrics metrics.Config
Downtime downtime.Config
Compensation compensation.Config
ProjectLimit accounting.ProjectLimitConfig

View File

@ -31,7 +31,7 @@ type Config struct {
ReferralManagerURL storj.NodeURL `help:"the URL for referral manager"`
}
// Service allows communicating with the Referral Manager
// Service allows communicating with the Referral Manager.
//
// architecture: Service
type Service struct {

View File

@ -43,7 +43,7 @@ type durabilityStats struct {
remoteSegmentsOverThreshold [5]int64
}
// Checker contains the information needed to do checks for missing pieces
// Checker contains the information needed to do checks for missing pieces.
//
// architecture: Chore
type Checker struct {
@ -243,7 +243,7 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
var _ metainfo.Observer = (*checkerObserver)(nil)
// checkerObserver implements the metainfo loop Observer interface
// checkerObserver implements the metainfo loop Observer interface.
//
// architecture: Observer
type checkerObserver struct {

View File

@ -15,7 +15,7 @@ var (
mon = monkit.Package()
)
// Inspector is a RPC service for inspecting irreparable internals
// Inspector is a RPC service for inspecting irreparable internals.
//
// architecture: Endpoint
type Inspector struct {

View File

@ -8,7 +8,6 @@ import (
"time"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/satellitedb/dbx"
)
// RepairQueue implements queueing for segments that need repairing.
@ -29,7 +28,6 @@ type RepairQueue interface {
// Count counts the number of segments in the repair queue.
Count(ctx context.Context) (count int, err error)
// TestDBAccess returns raw RepairQueue database access for test purposes.
// TODO: remove dependency.
TestDBAccess() *dbx.DB
// TestingSetAttemptedTime sets attempted time for a repairpath.
TestingSetAttemptedTime(ctx context.Context, repairpath []byte, t time.Time) (rowsAffected int64, err error)
}

View File

@ -4,7 +4,6 @@
package queue_test
import (
"context"
"math/rand"
"strconv"
"testing"
@ -16,7 +15,6 @@ import (
"storj.io/common/testcontext"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
)
@ -68,32 +66,19 @@ func TestOrder(t *testing.T) {
require.False(t, alreadyInserted)
}
// TODO: remove dependency on *dbx.DB
dbAccess := db.RepairQueue().TestDBAccess()
err := dbAccess.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
updateList := []struct {
path []byte
attempted time.Time
}{
{recentRepairPath, time.Now()},
{oldRepairPath, time.Now().Add(-7 * time.Hour)},
{olderRepairPath, time.Now().Add(-8 * time.Hour)},
}
for _, item := range updateList {
res, err := tx.Tx.ExecContext(ctx, dbAccess.Rebind(`UPDATE injuredsegments SET attempted = ? WHERE path = ?`), item.attempted, item.path)
if err != nil {
return err
}
count, err := res.RowsAffected()
if err != nil {
return err
}
require.EqualValues(t, 1, count)
}
return nil
})
require.NoError(t, err)
updateList := []struct {
path []byte
attempted time.Time
}{
{recentRepairPath, time.Now()},
{oldRepairPath, time.Now().Add(-7 * time.Hour)},
{olderRepairPath, time.Now().Add(-8 * time.Hour)},
}
for _, item := range updateList {
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
require.NoError(t, err)
require.EqualValues(t, 1, rowsAffected)
}
// path with attempted = null should be selected first
injuredSeg, err := repairQueue.Select(ctx)
@ -132,9 +117,6 @@ func TestOrderHealthyPieces(t *testing.T) {
// ("path/g", 10, null)
// ("path/h", 10, now-8h)
// TODO: remove dependency on *dbx.DB
dbAccess := db.RepairQueue().TestDBAccess()
// insert the 8 segments according to the plan above
injuredSegList := []struct {
path []byte
@ -164,11 +146,9 @@ func TestOrderHealthyPieces(t *testing.T) {
// next, if applicable, update the "attempted at" timestamp
if !item.attempted.IsZero() {
res, err := dbAccess.ExecContext(ctx, dbAccess.Rebind(`UPDATE injuredsegments SET attempted = ? WHERE path = ?`), item.attempted, item.path)
rowsAffected, err := db.RepairQueue().TestingSetAttemptedTime(ctx, item.path, item.attempted)
require.NoError(t, err)
count, err := res.RowsAffected()
require.NoError(t, err)
require.EqualValues(t, 1, count)
require.EqualValues(t, 1, rowsAffected)
}
}

View File

@ -33,8 +33,7 @@ import (
// the numbers of nodes determined by the upload repair max threshold
// - Shuts down several nodes, but keeping up a number equal to the minim
// threshold
// - Downloads the data from those left nodes and check that it's the same than
// the uploaded one
// - Downloads the data from those left nodes and check that it's the same than the uploaded one.
func TestDataRepairInMemory(t *testing.T) {
testDataRepair(t, true)
}
@ -894,10 +893,10 @@ func testRepairMultipleDisqualifiedAndSuspended(t *testing.T, inMemoryRepair boo
}
// TestDataRepairOverride_HigherLimit does the following:
// - Uploads test data
// - Kills nodes to fall to the Repair Override Value of the checker but stays above the original Repair Threshold
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
// the numbers of nodes determined by the upload repair max threshold
// - Uploads test data
// - Kills nodes to fall to the Repair Override Value of the checker but stays above the original Repair Threshold
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
// the numbers of nodes determined by the upload repair max threshold
func TestDataRepairOverride_HigherLimitInMemory(t *testing.T) {
testDataRepairOverrideHigherLimit(t, true)
}
@ -988,12 +987,12 @@ func testDataRepairOverrideHigherLimit(t *testing.T, inMemoryRepair bool) {
}
// TestDataRepairOverride_LowerLimit does the following:
// - Uploads test data
// - Kills nodes to fall to the Repair Threshold of the checker that should not trigger repair any longer
// - Starts Checker and Repairer and ensures this is the case.
// - Kills more nodes to fall to the Override Value to trigger repair
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
// the numbers of nodes determined by the upload repair max threshold
// - Uploads test data
// - Kills nodes to fall to the Repair Threshold of the checker that should not trigger repair any longer
// - Starts Checker and Repairer and ensures this is the case.
// - Kills more nodes to fall to the Override Value to trigger repair
// - Triggers data repair, which attempts to repair the data from the remaining nodes to
// the numbers of nodes determined by the upload repair max threshold
func TestDataRepairOverride_LowerLimitInMemory(t *testing.T) {
testDataRepairOverrideLowerLimit(t, true)
}
@ -1112,12 +1111,12 @@ func testDataRepairOverrideLowerLimit(t *testing.T, inMemoryRepair bool) {
}
// TestDataRepairUploadLimits does the following:
// - Uploads test data to nodes
// - Get one segment of that data to check in which nodes its pieces are stored
// - Kills as many nodes as needed which store such segment pieces
// - Triggers data repair
// - Verify that the number of pieces which repaired has uploaded don't overpass
// the established limit (success threshold + % of excess)
// - Uploads test data to nodes
// - Get one segment of that data to check in which nodes its pieces are stored
// - Kills as many nodes as needed which store such segment pieces
// - Triggers data repair
// - Verify that the number of pieces which repaired has uploaded don't overpass
// the established limit (success threshold + % of excess)
func TestDataRepairUploadLimitInMemory(t *testing.T) {
testDataRepairUploadLimit(t, true)
}

View File

@ -38,7 +38,7 @@ type Config struct {
InMemoryRepair bool `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"`
}
// Service contains the information needed to run the repair service
// Service contains the information needed to run the repair service.
//
// architecture: Worker
type Service struct {

View File

@ -19,7 +19,7 @@ var (
ErrOfferNotExist = errs.Class("no current offer")
)
// DB holds information about offer
// DB holds information about offers.
//
// architecture: Database
type DB interface {

View File

@ -11,7 +11,6 @@ import (
"storj.io/common/macaroon"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/satellitedb/dbx"
)
@ -20,11 +19,6 @@ type bucketsDB struct {
db *satelliteDB
}
// Buckets returns database for interacting with buckets.
func (db *satelliteDB) Buckets() metainfo.BucketsDB {
return &bucketsDB{db: db}
}
// CreateBucket creates a new bucket.
func (db *bucketsDB) CreateBucket(ctx context.Context, bucket storj.Bucket) (_ storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -21,7 +21,6 @@ import (
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/downtime"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/nodeapiversion"
@ -67,6 +66,7 @@ type satelliteDB struct {
// Options includes options for how a satelliteDB runs.
type Options struct {
ApplicationName string
APIKeysLRUOptions cache.Options
RevocationLRUOptions cache.Options
@ -122,7 +122,10 @@ func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
return nil, Error.New("unsupported driver %q", driver)
}
source = pgutil.CheckApplicationName(source)
source, err = pgutil.CheckApplicationName(source, opts.ApplicationName)
if err != nil {
return nil, err
}
dbxDB, err := dbx.Open(driver, source)
if err != nil {
@ -265,11 +268,6 @@ func (dbc *satelliteDBCollection) StripeCoinPayments() stripecoinpayments.DB {
return &stripeCoinPaymentsDB{db: dbc.getByName("stripecoinpayments")}
}
// DowntimeTracking returns database for downtime tracking.
func (dbc *satelliteDBCollection) DowntimeTracking() downtime.DB {
return &downtimeTrackingDB{db: dbc.getByName("downtimetracking")}
}
// SnoPayout returns database for storagenode payStubs and payments info.
func (dbc *satelliteDBCollection) SnoPayout() snopayout.DB {
return &paymentStubs{db: dbc.getByName("snopayout")}

View File

@ -10661,7 +10661,7 @@ func (obj *pgxImpl) Limited_Irreparabledb_By_Segmentpath_Greater_OrderBy_Asc_Seg
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Irreparabledb, err error) {
rows, err = func() (rows []*Irreparabledb, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -10845,7 +10845,7 @@ func (obj *pgxImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx context
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Node, err error) {
rows, err = func() (rows []*Node, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -10935,7 +10935,7 @@ func (obj *pgxImpl) Limited_Node_Id_Node_Address_Node_LastIpPort_Node_LastContac
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row, err error) {
rows, err = func() (rows []*Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -11441,7 +11441,7 @@ func (obj *pgxImpl) Limited_Project_By_CreatedAt_Less_OrderBy_Asc_CreatedAt(ctx
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Project, err error) {
rows, err = func() (rows []*Project, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -11535,7 +11535,7 @@ func (obj *pgxImpl) Limited_ProjectMember_By_ProjectId(ctx context.Context,
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*ProjectMember, err error) {
rows, err = func() (rows []*ProjectMember, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -11944,7 +11944,7 @@ func (obj *pgxImpl) First_BucketStorageTally_By_ProjectId_OrderBy_Desc_IntervalS
obj.logStmt(__stmt, __values...)
for {
bucket_storage_tally, err := func() (bucket_storage_tally *BucketStorageTally, err error) {
bucket_storage_tally, err = func() (bucket_storage_tally *BucketStorageTally, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -12665,7 +12665,7 @@ func (obj *pgxImpl) Limited_BucketMetainfo_By_ProjectId_And_Name_GreaterOrEqual_
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*BucketMetainfo, err error) {
rows, err = func() (rows []*BucketMetainfo, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -12715,7 +12715,7 @@ func (obj *pgxImpl) Limited_BucketMetainfo_By_ProjectId_And_Name_Greater_OrderBy
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*BucketMetainfo, err error) {
rows, err = func() (rows []*BucketMetainfo, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -12901,7 +12901,7 @@ func (obj *pgxImpl) Limited_StripeCustomer_By_CreatedAt_LessOrEqual_OrderBy_Desc
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*StripeCustomer, err error) {
rows, err = func() (rows []*StripeCustomer, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -12996,7 +12996,7 @@ func (obj *pgxImpl) Limited_CoinpaymentsTransaction_By_CreatedAt_LessOrEqual_And
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*CoinpaymentsTransaction, err error) {
rows, err = func() (rows []*CoinpaymentsTransaction, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -13071,7 +13071,7 @@ func (obj *pgxImpl) Limited_StripecoinpaymentsInvoiceProjectRecord_By_PeriodStar
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*StripecoinpaymentsInvoiceProjectRecord, err error) {
rows, err = func() (rows []*StripecoinpaymentsInvoiceProjectRecord, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -13301,7 +13301,7 @@ func (obj *pgxImpl) Limited_Coupon_By_CreatedAt_LessOrEqual_And_Status_OrderBy_D
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Coupon, err error) {
rows, err = func() (rows []*Coupon, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -13350,7 +13350,7 @@ func (obj *pgxImpl) Limited_CouponUsage_By_Period_And_Status_Equal_Number(ctx co
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*CouponUsage, err error) {
rows, err = func() (rows []*CouponUsage, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -17448,7 +17448,7 @@ func (obj *pgxcockroachImpl) Limited_Irreparabledb_By_Segmentpath_Greater_OrderB
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Irreparabledb, err error) {
rows, err = func() (rows []*Irreparabledb, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -17632,7 +17632,7 @@ func (obj *pgxcockroachImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ct
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Node, err error) {
rows, err = func() (rows []*Node, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -17722,7 +17722,7 @@ func (obj *pgxcockroachImpl) Limited_Node_Id_Node_Address_Node_LastIpPort_Node_L
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row, err error) {
rows, err = func() (rows []*Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -18228,7 +18228,7 @@ func (obj *pgxcockroachImpl) Limited_Project_By_CreatedAt_Less_OrderBy_Asc_Creat
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Project, err error) {
rows, err = func() (rows []*Project, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -18322,7 +18322,7 @@ func (obj *pgxcockroachImpl) Limited_ProjectMember_By_ProjectId(ctx context.Cont
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*ProjectMember, err error) {
rows, err = func() (rows []*ProjectMember, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -18731,7 +18731,7 @@ func (obj *pgxcockroachImpl) First_BucketStorageTally_By_ProjectId_OrderBy_Desc_
obj.logStmt(__stmt, __values...)
for {
bucket_storage_tally, err := func() (bucket_storage_tally *BucketStorageTally, err error) {
bucket_storage_tally, err = func() (bucket_storage_tally *BucketStorageTally, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -19452,7 +19452,7 @@ func (obj *pgxcockroachImpl) Limited_BucketMetainfo_By_ProjectId_And_Name_Greate
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*BucketMetainfo, err error) {
rows, err = func() (rows []*BucketMetainfo, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -19502,7 +19502,7 @@ func (obj *pgxcockroachImpl) Limited_BucketMetainfo_By_ProjectId_And_Name_Greate
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*BucketMetainfo, err error) {
rows, err = func() (rows []*BucketMetainfo, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -19688,7 +19688,7 @@ func (obj *pgxcockroachImpl) Limited_StripeCustomer_By_CreatedAt_LessOrEqual_Ord
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*StripeCustomer, err error) {
rows, err = func() (rows []*StripeCustomer, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -19783,7 +19783,7 @@ func (obj *pgxcockroachImpl) Limited_CoinpaymentsTransaction_By_CreatedAt_LessOr
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*CoinpaymentsTransaction, err error) {
rows, err = func() (rows []*CoinpaymentsTransaction, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -19858,7 +19858,7 @@ func (obj *pgxcockroachImpl) Limited_StripecoinpaymentsInvoiceProjectRecord_By_P
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*StripecoinpaymentsInvoiceProjectRecord, err error) {
rows, err = func() (rows []*StripecoinpaymentsInvoiceProjectRecord, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -20088,7 +20088,7 @@ func (obj *pgxcockroachImpl) Limited_Coupon_By_CreatedAt_LessOrEqual_And_Status_
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*Coupon, err error) {
rows, err = func() (rows []*Coupon, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err
@ -20137,7 +20137,7 @@ func (obj *pgxcockroachImpl) Limited_CouponUsage_By_Period_And_Status_Equal_Numb
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows []*CouponUsage, err error) {
rows, err = func() (rows []*CouponUsage, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err

View File

@ -18,7 +18,7 @@ First_{{ .Suffix }}({{ ctxarg .Args }})
obj.logStmt(__stmt, __values...)
for {
{{ arg .Row }}, err := func() ({{ param .Row }}, err error) {
{{ arg .Row }}, err = func() ({{ param .Row }}, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err

View File

@ -21,7 +21,7 @@ Limited_{{ .Suffix }}({{ ctxarg .Args }}, limit, offset)
obj.logStmt(__stmt, __values...)
for {
rows, err := func() (rows {{ sliceof .Row }}, err error) {
rows, err = func() (rows {{ sliceof .Row }}, err error) {
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, err

View File

@ -1,51 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"time"
"storj.io/common/storj"
"storj.io/storj/satellite/satellitedb/dbx"
)
type downtimeTrackingDB struct {
db *satelliteDB
}
// Add adds a record for a particular node ID with the amount of time it has been offline.
func (db *downtimeTrackingDB) Add(ctx context.Context, nodeID storj.NodeID, trackedTime time.Time, timeOffline time.Duration) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Create_NodesOfflineTime(ctx,
dbx.NodesOfflineTime_NodeId(nodeID.Bytes()),
dbx.NodesOfflineTime_TrackedAt(trackedTime),
dbx.NodesOfflineTime_Seconds(int(timeOffline.Seconds())),
)
return Error.Wrap(err)
}
// GetOfflineTime gets the total amount of offline time for a node within a certain timeframe.
// "total offline time" is defined as the sum of all offline time windows that begin inside the provided time window.
// An offline time window that began before `begin` but that overlaps with the provided time window is not included.
// An offline time window that begins within the provided time window, but that extends beyond `end` is included.
func (db *downtimeTrackingDB) GetOfflineTime(ctx context.Context, nodeID storj.NodeID, begin, end time.Time) (time.Duration, error) {
offlineEntries, err := db.db.All_NodesOfflineTime_By_NodeId_And_TrackedAt_Greater_And_TrackedAt_LessOrEqual(ctx,
dbx.NodesOfflineTime_NodeId(nodeID.Bytes()),
dbx.NodesOfflineTime_TrackedAt(begin),
dbx.NodesOfflineTime_TrackedAt(end),
)
if err != nil {
return time.Duration(0), Error.Wrap(err)
}
totalSeconds := 0
for _, entry := range offlineEntries {
totalSeconds += entry.Seconds
}
duration := time.Duration(totalSeconds) * time.Second
return duration, nil
}

View File

@ -25,7 +25,9 @@ import (
"storj.io/storj/private/dbutil/pgtest"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/dbutil/tempdb"
"storj.io/storj/private/migrate"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/dbx"
)
// loadSnapshots loads all the dbschemas from `testdata/postgres.*`.
@ -132,6 +134,15 @@ func loadSchemaFromSQL(ctx context.Context, connstr, script string) (_ *dbschema
func TestMigratePostgres(t *testing.T) { migrateTest(t, pgtest.PickPostgres(t)) }
func TestMigrateCockroach(t *testing.T) { migrateTest(t, pgtest.PickCockroachAlt(t)) }
type migrationTestingAccess interface {
// MigrationTestingDefaultDB assists in testing migrations themselves
// against the default database.
MigrationTestingDefaultDB() interface {
TestDBAccess() *dbx.DB
PostgresMigration() *migrate.Migration
}
}
func migrateTest(t *testing.T, connStr string) {
t.Parallel()
@ -146,12 +157,12 @@ func migrateTest(t *testing.T, connStr string) {
defer func() { require.NoError(t, tempDB.Close()) }()
// create a new satellitedb connection
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-migration-test"})
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
// we need raw database access unfortunately
rawdb := db.MigrationTestingDefaultDB().TestDBAccess()
rawdb := db.(migrationTestingAccess).MigrationTestingDefaultDB().TestDBAccess()
snapshots, dbxschema, err := loadSnapshots(ctx, connStr, rawdb.Schema())
require.NoError(t, err)
@ -159,7 +170,7 @@ func migrateTest(t *testing.T, connStr string) {
var finalSchema *dbschema.Schema
// get migration for this database
migrations := db.MigrationTestingDefaultDB().PostgresMigration()
migrations := db.(migrationTestingAccess).MigrationTestingDefaultDB().PostgresMigration()
for i, step := range migrations.Steps {
tag := fmt.Sprintf("#%d - v%d", i, step.Version)
@ -238,7 +249,7 @@ func benchmarkSetup(b *testing.B, connStr string, merged bool) {
defer func() { require.NoError(b, tempDB.Close()) }()
// create a new satellitedb connection
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{})
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-migration-test"})
require.NoError(b, err)
defer func() { require.NoError(b, db.Close()) }()

View File

@ -90,7 +90,10 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
return nodes, nil
}
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int,
criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID,
excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
newNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, true)
@ -171,7 +174,9 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
}
// nodeSelectionCondition creates a condition with arguments that corresponds to the arguments.
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID,
excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
var conds conditions
conds.add(`disqualified IS NULL`)
conds.add(`unknown_audit_suspended IS NULL`)
@ -220,15 +225,15 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria,
return conds.combine(), nil
}
// partialQuery corresponds to a query
// partialQuery corresponds to a query.
//
// distinct=false
// distinct=false
//
// $selection WHERE $condition ORDER BY $orderBy, RANDOM() LIMIT $limit
// $selection WHERE $condition ORDER BY $orderBy, RANDOM() LIMIT $limit
//
// distinct=true
// distinct=true
//
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
//
type partialQuery struct {
selection string

View File

@ -24,10 +24,6 @@ type repairQueue struct {
db *satelliteDB
}
func (r *repairQueue) TestDBAccess() *dbx.DB {
return r.db.DB
}
func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment, segmentHealth float64) (alreadyInserted bool, err error) {
defer mon.Task()(&ctx)(&err)
// insert if not exists, or update healthy count if does exist
@ -151,3 +147,14 @@ func (r *repairQueue) Count(ctx context.Context) (count int, err error) {
return count, Error.Wrap(err)
}
// TestingSetAttemptedTime sets attempted time for a repairpath.
func (r *repairQueue) TestingSetAttemptedTime(ctx context.Context, repairpath []byte, t time.Time) (rowsAffected int64, err error) {
defer mon.Task()(&ctx)(&err)
res, err := r.db.ExecContext(ctx, r.db.Rebind(`UPDATE injuredsegments SET attempted = ? WHERE path = ?`), t, repairpath)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
return count, Error.Wrap(err)
}

View File

@ -122,7 +122,7 @@ func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category
// CreateMasterDBOnTopOf creates a new satellite database on top of an already existing
// temporary database.
func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db satellite.DB, err error) {
masterDB, err := satellitedb.Open(ctx, log.Named("db"), tempDB.ConnStr, satellitedb.Options{})
masterDB, err := satellitedb.Open(ctx, log.Named("db"), tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-satellitdb-test"})
return &tempMasterDB{DB: masterDB, tempDB: tempDB}, err
}
@ -159,7 +159,7 @@ func CreatePointerDB(ctx context.Context, log *zap.Logger, name string, category
// CreatePointerDBOnTopOf creates a new satellite database on top of an already existing
// temporary database.
func CreatePointerDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.PointerDB, err error) {
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), tempDB.ConnStr)
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), tempDB.ConnStr, "satellite-satellitdb-test")
if err != nil {
return nil, err
}

View File

@ -21,7 +21,7 @@ var (
mon = monkit.Package()
)
// Endpoint for querying node stats for the SNO
// Endpoint for querying node stats for the SNO.
//
// architecture: Endpoint
type Endpoint struct {

Some files were not shown because too many files have changed in this diff Show More