From adb2c83e09b7a2109648ee9cc46adf84af3ed790 Mon Sep 17 00:00:00 2001 From: Jennifer Johnson Date: Tue, 8 Dec 2020 21:47:10 -0500 Subject: [PATCH 1/7] cmd/uplink: adds register, url, and dns flags to uplink share and replaces access grant with access uplink share --> creates access grant uplink share --register --> registers access grant uplink share --url --> creates URL, implies register and public uplink share --dns --> creates dns info, implies register and public Change-Id: I7930c4973a602d3d721ec6f77170f90957dad8c0 --- cmd/uplink/cmd/access.go | 41 +++--- cmd/uplink/cmd/share.go | 311 ++++++++++++++++++++++++++------------- 2 files changed, 228 insertions(+), 124 deletions(-) diff --git a/cmd/uplink/cmd/access.go b/cmd/uplink/cmd/access.go index 9622324c8..e860852ef 100644 --- a/cmd/uplink/cmd/access.go +++ b/cmd/uplink/cmd/access.go @@ -153,71 +153,74 @@ func registerAccess(cmd *cobra.Command, args []string) (err error) { if len(args) == 0 { return errs.New("no access specified") } + _, err = register(args[0], registerCfg.AuthService, registerCfg.Public) + return err +} - if registerCfg.AuthService == "" { - return errs.New("no auth service address provided") +func register(accessRaw, authService string, public bool) (accessKey string, err error) { + if authService == "" { + return "", errs.New("no auth service address provided") } - accessRaw := args[0] - // try assuming that accessRaw is a named access access, err := registerCfg.GetNamedAccess(accessRaw) if err == nil && access != nil { accessRaw, err = access.Serialize() if err != nil { - return errs.New("error serializing named access '%s': %w", accessRaw, err) + return "", errs.New("error serializing named access '%s': %w", accessRaw, err) } } postData, err := json.Marshal(map[string]interface{}{ "access_grant": accessRaw, - "public": registerCfg.Public, + "public": public, }) if err != nil { - return errs.Wrap(err) + return accessKey, errs.Wrap(err) } - resp, err := http.Post(fmt.Sprintf("%s/v1/access", registerCfg.AuthService), "application/json", bytes.NewReader(postData)) + resp, err := http.Post(fmt.Sprintf("%s/v1/access", authService), "application/json", bytes.NewReader(postData)) if err != nil { - return err + return "", err } defer func() { err = errs.Combine(err, resp.Body.Close()) }() body, err := ioutil.ReadAll(resp.Body) if err != nil { - return err + return "", err } respBody := make(map[string]string) if err := json.Unmarshal(body, &respBody); err != nil { - return errs.New("unexpected response from auth service: %s", string(body)) + return "", errs.New("unexpected response from auth service: %s", string(body)) } accessKey, ok := respBody["access_key_id"] if !ok { - return errs.New("access_key_id missing in response") + return "", errs.New("access_key_id missing in response") } secretKey, ok := respBody["secret_key"] if !ok { - return errs.New("secret_key missing in response") + return "", errs.New("secret_key missing in response") } - fmt.Println("=========== CREDENTIALS =========================================================") + + fmt.Println("========== CREDENTIALS ===================================================================") fmt.Println("Access Key ID: ", accessKey) - fmt.Println("Secret Key: ", secretKey) - fmt.Println("Endpoint: ", respBody["endpoint"]) + fmt.Println("Secret Key : ", secretKey) + fmt.Println("Endpoint : ", respBody["endpoint"]) // update AWS credential file if requested if registerCfg.AWSProfile != "" { credentialsPath, err := getAwsCredentialsPath() if err != nil { - return err + return "", err } err = writeAWSCredentials(credentialsPath, registerCfg.AWSProfile, accessKey, secretKey) if err != nil { - return err + return "", err } } - return nil + return accessKey, nil } // getAwsCredentialsPath returns the expected AWS credentials path. diff --git a/cmd/uplink/cmd/share.go b/cmd/uplink/cmd/share.go index 11e057d08..ddb27322d 100644 --- a/cmd/uplink/cmd/share.go +++ b/cmd/uplink/cmd/share.go @@ -7,8 +7,10 @@ import ( "fmt" "io/ioutil" "net/url" + "os" "path/filepath" "strings" + "text/tabwriter" "time" "github.com/spf13/cobra" @@ -31,7 +33,13 @@ var shareCfg struct { NotAfter string `help:"disallow access after this time (e.g. '+2h', '2020-01-02T15:01:01-01:00')" basic-help:"true"` AllowedPathPrefix []string `help:"whitelist of path prefixes to require, overrides the [allowed-path-prefix] arguments"` ExportTo string `default:"" help:"path to export the shared access to" basic-help:"true"` - BaseURL string `default:"https://link.tardigradeshare.io" help:"the base url for link sharing"` + BaseURL string `default:"https://link.tardigradeshare.io" help:"the base url for link sharing" basic-help:"true"` + + Register bool `default:"false" help:"if true, creates and registers access grant" basic-help:"true"` + URL bool `default:"false" help:"if true, returns a url for the shared path. implies --register and --public" basic-help:"true"` + DNS string `default:"" help:"specify your custom hostname. if set, returns dns settings for web hosting. implies --register and --public" basic-help:"true"` + AuthService string `default:"https://auth.tardigradeshare.io" help:"url for shared auth service" basic-help:"true"` + Public bool `default:"false" help:"if true, the access will be public. --dns and --url override this" basic-help:"true"` // Share requires information about the current access AccessConfig @@ -46,11 +54,195 @@ func init() { Short: "Shares restricted access to objects.", RunE: shareMain, } + RootCmd.AddCommand(shareCmd) process.Bind(shareCmd, &shareCfg, defaults, cfgstruct.ConfDir(getConfDir())) } +func shareMain(cmd *cobra.Command, args []string) (err error) { + newAccessData, sharePrefixes, permission, err := createAccessGrant(args) + if err != nil { + return err + } + + var accessKey string + + if shareCfg.Register || shareCfg.URL || shareCfg.DNS != "" { + isPublic := (shareCfg.Public || shareCfg.URL || shareCfg.DNS != "") + accessKey, err = register(newAccessData, shareCfg.AuthService, isPublic) + if err != nil { + return err + } + fmt.Println("Public Access: ", isPublic) + + if len(shareCfg.AllowedPathPrefix) == 1 && !permission.AllowUpload && !permission.AllowDelete { + if shareCfg.URL { + if err = createURL(accessKey, sharePrefixes); err != nil { + return err + } + } + if shareCfg.DNS != "" { + if err = createDNS(accessKey); err != nil { + return err + } + } + } + } + + if shareCfg.ExportTo != "" { + // convert to an absolute path, mostly for output purposes. + exportTo, err := filepath.Abs(shareCfg.ExportTo) + if err != nil { + return Error.Wrap(err) + } + if err := ioutil.WriteFile(exportTo, []byte(newAccessData+"\n"), 0600); err != nil { + return Error.Wrap(err) + } + fmt.Println("Exported to:", exportTo) + } + return nil +} + +// Creates access grant for allowed path prefixes. +func createAccessGrant(args []string) (newAccessData string, sharePrefixes []sharePrefixExtension, permission uplink.Permission, err error) { + now := time.Now() + notBefore, err := parseHumanDate(shareCfg.NotBefore, now) + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + notAfter, err := parseHumanDate(shareCfg.NotAfter, now) + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + + if len(shareCfg.AllowedPathPrefix) == 0 { + // if the --allowed-path-prefix flag is not set, + // use any arguments as allowed path prefixes + for _, arg := range args { + shareCfg.AllowedPathPrefix = append(shareCfg.AllowedPathPrefix, strings.Split(arg, ",")...) + } + } + + var uplinkSharePrefixes []uplink.SharePrefix + for _, path := range shareCfg.AllowedPathPrefix { + p, err := fpath.New(path) + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + if p.IsLocal() { + return newAccessData, sharePrefixes, permission, errs.New("required path must be remote: %q", path) + } + + uplinkSharePrefix := uplink.SharePrefix{ + Bucket: p.Bucket(), + Prefix: p.Path(), + } + sharePrefixes = append(sharePrefixes, sharePrefixExtension{ + uplinkSharePrefix: uplinkSharePrefix, + hasTrailingSlash: strings.HasSuffix(path, "/"), + }) + uplinkSharePrefixes = append(uplinkSharePrefixes, uplinkSharePrefix) + } + + access, err := shareCfg.GetAccess() + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + + permission = uplink.Permission{} + permission.AllowDelete = !shareCfg.DisallowDeletes && !shareCfg.Readonly + permission.AllowList = !shareCfg.DisallowLists && !shareCfg.Writeonly + permission.AllowDownload = !shareCfg.DisallowReads && !shareCfg.Writeonly + permission.AllowUpload = !shareCfg.DisallowWrites && !shareCfg.Readonly + permission.NotBefore = notBefore + permission.NotAfter = notAfter + + newAccess, err := access.Share(permission, uplinkSharePrefixes...) + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + + newAccessData, err = newAccess.Serialize() + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + + satelliteAddr, _, _, err := parseAccess(newAccessData) + if err != nil { + return newAccessData, sharePrefixes, permission, err + } + + fmt.Println("Sharing access to satellite", satelliteAddr) + fmt.Println("=========== ACCESS RESTRICTIONS ==========================================================") + fmt.Println("Download :", formatPermission(permission.AllowDownload)) + fmt.Println("Upload :", formatPermission(permission.AllowUpload)) + fmt.Println("Lists :", formatPermission(permission.AllowList)) + fmt.Println("Deletes :", formatPermission(permission.AllowDelete)) + fmt.Println("NotBefore :", formatTimeRestriction(permission.NotBefore)) + fmt.Println("NotAfter :", formatTimeRestriction(permission.NotAfter)) + fmt.Println("Paths :", formatPaths(sharePrefixes)) + fmt.Println("=========== SERIALIZED ACCESS WITH THE ABOVE RESTRICTIONS TO SHARE WITH OTHERS ===========") + fmt.Println("Access :", newAccessData) + + return newAccessData, sharePrefixes, permission, nil +} + +// Creates linksharing url for allowed path prefixes. +func createURL(newAccessData string, sharePrefixes []sharePrefixExtension) (err error) { + p, err := fpath.New(shareCfg.AllowedPathPrefix[0]) + if err != nil { + return err + } + fmt.Println("=========== BROWSER URL ==================================================================") + fmt.Println("REMINDER : Object key must end in '/' when trying to share recursively") + + var printFormat string + if p.Path() == "" || !sharePrefixes[0].hasTrailingSlash { // Check if the path is empty (aka sharing the entire bucket) or the path is not a directory or an object that ends in "/". + printFormat = "URL : %s/%s/%s/%s\n" + } else { + printFormat = "URL : %s/%s/%s/%s/\n" + } + fmt.Printf(printFormat, shareCfg.BaseURL, url.PathEscape(newAccessData), p.Bucket(), p.Path()) + return nil +} + +// Creates dns record info for allowed path prefixes. +func createDNS(accessKey string) (err error) { + p, err := fpath.New(shareCfg.AllowedPathPrefix[0]) + if err != nil { + return err + } + CNAME, err := url.Parse(shareCfg.BaseURL) + if err != nil { + return err + } + + minWidth := len(shareCfg.DNS) + 5 // add 5 spaces to account for "txt-" + w := new(tabwriter.Writer) + w.Init(os.Stdout, minWidth, minWidth, 0, '\t', 0) + defer func() { + err = errs.Combine(err, w.Flush()) + }() + + var printStorjRoot string + if p.Path() == "" { + printStorjRoot = fmt.Sprintf("txt-%s\tIN\tTXT \tstorj-root:%s", shareCfg.DNS, p.Bucket()) + } else { + printStorjRoot = fmt.Sprintf("txt-%s\tIN\tTXT \tstorj-root:%s/%s", shareCfg.DNS, p.Bucket(), p.Path()) + } + + fmt.Println("=========== DNS INFO =====================================================================") + fmt.Println("Remember to update the $ORIGIN with your domain name. You may also change the $TTL.") + fmt.Fprintln(w, "$ORIGIN example.com.") + fmt.Fprintln(w, "$TTL 3600") + fmt.Fprintf(w, "%s \tIN\tCNAME\t%s.\n", shareCfg.DNS, CNAME.Host) + fmt.Fprintln(w, printStorjRoot) + fmt.Fprintf(w, "txt-%s\tIN\tTXT \tstorj-access:%s\n", shareCfg.DNS, accessKey) + + return nil +} + func parseHumanDate(date string, now time.Time) (time.Time, error) { switch { case date == "": @@ -71,105 +263,10 @@ func parseHumanDate(date string, now time.Time) (time.Time, error) { } } -// shareMain is the function executed when shareCmd is called. -func shareMain(cmd *cobra.Command, args []string) (err error) { - now := time.Now() - notBefore, err := parseHumanDate(shareCfg.NotBefore, now) - if err != nil { - return err - } - notAfter, err := parseHumanDate(shareCfg.NotAfter, now) - if err != nil { - return err - } - - if len(shareCfg.AllowedPathPrefix) == 0 { - // if the --allowed-path-prefix flag is not set, - // use any arguments as allowed path prefixes - for _, arg := range args { - shareCfg.AllowedPathPrefix = append(shareCfg.AllowedPathPrefix, strings.Split(arg, ",")...) - } - } - - var sharePrefixes []uplink.SharePrefix - for _, path := range shareCfg.AllowedPathPrefix { - p, err := fpath.New(path) - if err != nil { - return err - } - if p.IsLocal() { - return errs.New("required path must be remote: %q", path) - } - - sharePrefixes = append(sharePrefixes, uplink.SharePrefix{ - Bucket: p.Bucket(), - Prefix: p.Path(), - }) - } - - access, err := shareCfg.GetAccess() - if err != nil { - return err - } - - permission := uplink.Permission{} - permission.AllowDelete = !shareCfg.DisallowDeletes && !shareCfg.Readonly - permission.AllowList = !shareCfg.DisallowLists && !shareCfg.Writeonly - permission.AllowDownload = !shareCfg.DisallowReads && !shareCfg.Writeonly - permission.AllowUpload = !shareCfg.DisallowWrites && !shareCfg.Readonly - permission.NotBefore = notBefore - permission.NotAfter = notAfter - - newAccess, err := access.Share(permission, sharePrefixes...) - if err != nil { - return err - } - - newAccessData, err := newAccess.Serialize() - if err != nil { - return err - } - - satelliteAddr, _, _, err := parseAccess(newAccessData) - if err != nil { - return err - } - fmt.Println("Sharing access to satellite", satelliteAddr) - fmt.Println("=========== ACCESS RESTRICTIONS ==========================================================") - fmt.Println("Download :", formatPermission(permission.AllowDownload)) - fmt.Println("Upload :", formatPermission(permission.AllowUpload)) - fmt.Println("Lists :", formatPermission(permission.AllowList)) - fmt.Println("Deletes :", formatPermission(permission.AllowDelete)) - fmt.Println("NotBefore :", formatTimeRestriction(permission.NotBefore)) - fmt.Println("NotAfter :", formatTimeRestriction(permission.NotAfter)) - fmt.Println("Paths :", formatPaths(sharePrefixes)) - fmt.Println("=========== SERIALIZED ACCESS WITH THE ABOVE RESTRICTIONS TO SHARE WITH OTHERS ===========") - fmt.Println("Access :", newAccessData) - - if len(shareCfg.AllowedPathPrefix) == 1 && !permission.AllowUpload && !permission.AllowDelete { - fmt.Println("=========== BROWSER URL ==================================================================") - p, err := fpath.New(shareCfg.AllowedPathPrefix[0]) - if err != nil { - return err - } - fmt.Println("URL :", fmt.Sprintf("%s/%s/%s/%s", shareCfg.BaseURL, - url.PathEscape(newAccessData), - url.PathEscape(p.Bucket()), - url.PathEscape(p.Path()))) - } - - if shareCfg.ExportTo != "" { - // convert to an absolute path, mostly for output purposes. - exportTo, err := filepath.Abs(shareCfg.ExportTo) - if err != nil { - return Error.Wrap(err) - } - if err := ioutil.WriteFile(exportTo, []byte(newAccessData+"\n"), 0600); err != nil { - return Error.Wrap(err) - } - fmt.Println("Exported to:", exportTo) - } - return nil +// sharePrefixExtension is a temporary struct type. We might want to add hasTrailingSlash bool to `uplink.SharePrefix` directly. +type sharePrefixExtension struct { + uplinkSharePrefix uplink.SharePrefix + hasTrailingSlash bool } func formatPermission(allowed bool) string { @@ -186,19 +283,23 @@ func formatTimeRestriction(t time.Time) string { return formatTime(t) } -func formatPaths(sharePrefixes []uplink.SharePrefix) string { +func formatPaths(sharePrefixes []sharePrefixExtension) string { if len(sharePrefixes) == 0 { return "WARNING! The entire project is shared!" } var paths []string for _, prefix := range sharePrefixes { - path := "sj://" + prefix.Bucket - if len(prefix.Prefix) == 0 { - path += " (entire bucket)" + path := "sj://" + prefix.uplinkSharePrefix.Bucket + if len(prefix.uplinkSharePrefix.Prefix) == 0 { + path += "/ (entire bucket)" } else { - path += "/" + prefix.Prefix + path += "/" + prefix.uplinkSharePrefix.Prefix + if prefix.hasTrailingSlash { + path += "/" + } } + paths = append(paths, path) } From 2ac72eaf168c8755229b3e8cb0a5fd7ba767fe98 Mon Sep 17 00:00:00 2001 From: Cameron Ayer Date: Fri, 20 Nov 2020 17:20:03 -0500 Subject: [PATCH 2/7] satellite/repair/checker: add new monkit stats tagged with rs scheme There is a new checker field called statsCollector. This contains a map of stats pointers where the key is a stringified redundancy scheme. stats contains all tagged monkit metrics. These metrics exist under the key name, "tagged_repair_stats", which is tagged with the name of each metric and a corresponding rs scheme. As the metainfo observer works on a segment, it checks statsCollector for a stats corresponding to the segment's redundancy scheme. If one doesn't exist, it is created and chained to the monkit scope. Now we can call Observe, Inc, etc on the fields just like before, and they have tags! durabilityStats has also been renamed to aggregateStats. At the end of the metainfo loop, we insert the aggregateStats totals into the corresponding stats fields for metric reporting. Change-Id: I8aa1918351d246a8ef818b9712ed4cb39d1ea9c6 --- satellite/repair/checker/checker.go | 99 +++++++++----- satellite/repair/checker/checkerstats.go | 166 +++++++++++++++++++++++ satellite/repair/checker/config.go | 8 +- 3 files changed, 238 insertions(+), 35 deletions(-) create mode 100644 satellite/repair/checker/checkerstats.go diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index 993b8f6d3..04d1c1efe 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -14,6 +14,7 @@ import ( "storj.io/common/errs2" "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/sync2" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metainfo" @@ -30,19 +31,6 @@ var ( mon = monkit.Package() ) -// durabilityStats remote segment information. -type durabilityStats struct { - objectsChecked int64 - remoteSegmentsChecked int64 - remoteSegmentsNeedingRepair int64 - newRemoteSegmentsNeedingRepair int64 - remoteSegmentsLost int64 - remoteSegmentsFailedToCheck int64 - remoteSegmentInfo []metabase.ObjectLocation - // remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc... - remoteSegmentsOverThreshold [5]int64 -} - // Checker contains the information needed to do checks for missing pieces. // // architecture: Chore @@ -53,6 +41,7 @@ type Checker struct { metainfo *metainfo.Service metaLoop *metainfo.Loop nodestate *ReliabilityCache + statsCollector *statsCollector repairOverrides RepairOverridesMap nodeFailureRate float64 Loop *sync2.Cycle @@ -69,6 +58,7 @@ func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irrepar metainfo: metainfo, metaLoop: metaLoop, nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness), + statsCollector: newStatsCollector(), repairOverrides: config.RepairOverrides.GetMap(), nodeFailureRate: config.NodeFailureRate, @@ -115,7 +105,8 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) repairQueue: checker.repairQueue, irrdb: checker.irrdb, nodestate: checker.nodestate, - monStats: durabilityStats{}, + statsCollector: checker.statsCollector, + monStats: aggregateStats{}, repairOverrides: checker.repairOverrides, nodeFailureRate: checker.nodeFailureRate, log: checker.logger, @@ -134,6 +125,8 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) return Error.Wrap(err) } + checker.statsCollector.collectAggregates() + mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //mon:locked mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //mon:locked mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //mon:locked @@ -250,12 +243,27 @@ type checkerObserver struct { repairQueue queue.RepairQueue irrdb irreparable.DB nodestate *ReliabilityCache - monStats durabilityStats + statsCollector *statsCollector + monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this repairOverrides RepairOverridesMap nodeFailureRate float64 log *zap.Logger } +func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats { + rsString := getRSString(obs.loadRedundancy(redundancy)) + return obs.statsCollector.getStatsByRS(rsString) +} + +func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) { + repair := int(redundancy.RepairShares) + overrideValue := obs.repairOverrides.GetOverrideValue(redundancy) + if overrideValue != 0 { + repair = int(overrideValue) + } + return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares) +} + func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { defer mon.Task()(&ctx)(&err) @@ -264,10 +272,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo return nil } + stats := obs.getStatsByRS(segment.Redundancy) + obs.monStats.remoteSegmentsChecked++ + stats.iterationAggregates.remoteSegmentsChecked++ // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Counter("checker_segments_below_min_req").Inc(0) //mon:locked + stats.segmentsBelowMinReq.Inc(0) pieces := segment.Pieces if len(pieces) == 0 { @@ -287,26 +299,25 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces) if err != nil { obs.monStats.remoteSegmentsFailedToCheck++ + stats.iterationAggregates.remoteSegmentsFailedToCheck++ return errs.Combine(Error.New("error getting missing pieces"), err) } numHealthy := len(pieces) - len(missingPieces) - mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked + mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked + stats.segmentTotalCount.Observe(int64(len(pieces))) mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked + stats.segmentHealthyCount.Observe(int64(numHealthy)) segmentAge := time.Since(segment.CreationDate) mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked + stats.segmentAge.Observe(int64(segmentAge.Seconds())) + + required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy) - redundancy := segment.Redundancy - required := int(redundancy.RequiredShares) - repairThreshold := int(redundancy.RepairShares) - overrideValue := obs.repairOverrides.GetOverrideValue(redundancy) - if overrideValue != 0 { - repairThreshold = int(overrideValue) - } - successThreshold := int(redundancy.OptimalShares) segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate) mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked + stats.segmentHealth.Observe(segmentHealth) key := segment.Location.Encode() // we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to @@ -314,7 +325,9 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo // except for the case when the repair and success thresholds are the same (a case usually seen during testing) if numHealthy >= required && numHealthy <= repairThreshold && numHealthy < successThreshold { mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked + stats.injuredSegmentHealth.Observe(segmentHealth) obs.monStats.remoteSegmentsNeedingRepair++ + stats.iterationAggregates.remoteSegmentsNeedingRepair++ alreadyInserted, err := obs.repairQueue.Insert(ctx, &internalpb.InjuredSegment{ Path: key, LostPieces: missingPieces, @@ -327,6 +340,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo if !alreadyInserted { obs.monStats.newRemoteSegmentsNeedingRepair++ + stats.iterationAggregates.newRemoteSegmentsNeedingRepair++ } // delete always returns nil when something was deleted and also when element didn't exists @@ -340,6 +354,9 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo if !containsObjectLocation(obs.monStats.remoteSegmentInfo, lostSegInfo) { obs.monStats.remoteSegmentInfo = append(obs.monStats.remoteSegmentInfo, lostSegInfo) } + if !containsObjectLocation(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo) { + stats.iterationAggregates.remoteSegmentInfo = append(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo) + } var segmentAge time.Duration if segment.CreationDate.Before(segment.LastRepaired) { @@ -348,9 +365,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo segmentAge = time.Since(segment.CreationDate) } mon.IntVal("checker_segment_time_until_irreparable").Observe(int64(segmentAge.Seconds())) //mon:locked + stats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds())) obs.monStats.remoteSegmentsLost++ + stats.iterationAggregates.remoteSegmentsLost++ + mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked + stats.segmentsBelowMinReq.Inc(1) + // make an entry into the irreparable table segmentInfo := &internalpb.IrreparableSegment{ Path: key, @@ -366,13 +388,25 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo obs.log.Error("error handling irreparable segment to queue", zap.Error(err)) return nil } - } else if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(obs.monStats.remoteSegmentsOverThreshold)) { - // record metrics for segments right above repair threshold - // numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5 - for i := range obs.monStats.remoteSegmentsOverThreshold { - if numHealthy == (repairThreshold + i + 1) { - obs.monStats.remoteSegmentsOverThreshold[i]++ - break + } else { + if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(obs.monStats.remoteSegmentsOverThreshold)) { + // record metrics for segments right above repair threshold + // numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5 + for i := range obs.monStats.remoteSegmentsOverThreshold { + if numHealthy == (repairThreshold + i + 1) { + obs.monStats.remoteSegmentsOverThreshold[i]++ + break + } + } + } + if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(stats.iterationAggregates.remoteSegmentsOverThreshold)) { + // record metrics for segments right above repair threshold + // numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5 + for i := range stats.iterationAggregates.remoteSegmentsOverThreshold { + if numHealthy == (repairThreshold + i + 1) { + stats.iterationAggregates.remoteSegmentsOverThreshold[i]++ + break + } } } } @@ -385,6 +419,9 @@ func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object) obs.monStats.objectsChecked++ + stats := obs.getStatsByRS(object.LastSegment.Redundancy) + stats.iterationAggregates.objectsChecked++ + return nil } diff --git a/satellite/repair/checker/checkerstats.go b/satellite/repair/checker/checkerstats.go new file mode 100644 index 000000000..67aa073ca --- /dev/null +++ b/satellite/repair/checker/checkerstats.go @@ -0,0 +1,166 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package checker + +import ( + "fmt" + + "github.com/spacemonkeygo/monkit/v3" + + "storj.io/storj/satellite/metainfo/metabase" +) + +// statsCollector holds a *stats for each redundancy scheme +// seen by the checker. These are chained into the monkit scope for +// monitoring as they are initialized. +type statsCollector struct { + stats map[string]*stats +} + +func newStatsCollector() *statsCollector { + return &statsCollector{ + stats: make(map[string]*stats), + } +} + +func (collector *statsCollector) getStatsByRS(rs string) *stats { + stats, ok := collector.stats[rs] + if !ok { + stats = newStats(rs) + mon.Chain(stats) + collector.stats[rs] = stats + } + return stats +} + +// collectAggregates transfers the iteration aggregates into the +// respective stats monkit metrics at the end of each checker iteration. +// iterationAggregates is then cleared. +func (collector *statsCollector) collectAggregates() { + for _, stats := range collector.stats { + stats.collectAggregates() + stats.iterationAggregates = new(aggregateStats) + } +} + +// stats is used for collecting and reporting checker metrics. +// +// add any new metrics tagged with rs_scheme to this struct and set them +// in newStats. +type stats struct { + iterationAggregates *aggregateStats + + objectsChecked *monkit.IntVal + remoteSegmentsChecked *monkit.IntVal + remoteSegmentsNeedingRepair *monkit.IntVal + newRemoteSegmentsNeedingRepair *monkit.IntVal + remoteSegmentsLost *monkit.IntVal + objectsLost *monkit.IntVal + remoteSegmentsFailedToCheck *monkit.IntVal + remoteSegmentsHealthyPercentage *monkit.FloatVal + + // remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc... + remoteSegmentsOverThreshold1 *monkit.IntVal + remoteSegmentsOverThreshold2 *monkit.IntVal + remoteSegmentsOverThreshold3 *monkit.IntVal + remoteSegmentsOverThreshold4 *monkit.IntVal + remoteSegmentsOverThreshold5 *monkit.IntVal + + segmentsBelowMinReq *monkit.Counter + segmentTotalCount *monkit.IntVal + segmentHealthyCount *monkit.IntVal + segmentAge *monkit.IntVal + segmentHealth *monkit.FloatVal + injuredSegmentHealth *monkit.FloatVal + segmentTimeUntilIrreparable *monkit.IntVal +} + +// aggregateStats tallies data over the full checker iteration. +type aggregateStats struct { + objectsChecked int64 + remoteSegmentsChecked int64 + remoteSegmentsNeedingRepair int64 + newRemoteSegmentsNeedingRepair int64 + remoteSegmentsLost int64 + remoteSegmentsFailedToCheck int64 + remoteSegmentInfo []metabase.ObjectLocation + + // remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc... + remoteSegmentsOverThreshold [5]int64 +} + +func newStats(rs string) *stats { + return &stats{ + iterationAggregates: new(aggregateStats), + objectsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_objects_checked").WithTag("rs_scheme", rs)), + remoteSegmentsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_checked").WithTag("rs_scheme", rs)), + remoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_needing_repair").WithTag("rs_scheme", rs)), + newRemoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "new_remote_segments_needing_repair").WithTag("rs_scheme", rs)), + remoteSegmentsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_lost").WithTag("rs_scheme", rs)), + objectsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "objects_lost").WithTag("rs_scheme", rs)), + remoteSegmentsFailedToCheck: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_failed_to_check").WithTag("rs_scheme", rs)), + remoteSegmentsHealthyPercentage: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_healthy_percentage").WithTag("rs_scheme", rs)), + remoteSegmentsOverThreshold1: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_1").WithTag("rs_scheme", rs)), + remoteSegmentsOverThreshold2: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_2").WithTag("rs_scheme", rs)), + remoteSegmentsOverThreshold3: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_3").WithTag("rs_scheme", rs)), + remoteSegmentsOverThreshold4: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_4").WithTag("rs_scheme", rs)), + remoteSegmentsOverThreshold5: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_5").WithTag("rs_scheme", rs)), + segmentsBelowMinReq: monkit.NewCounter(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segments_below_min_req").WithTag("rs_scheme", rs)), + segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)), + segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)), + segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)), + segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)), + injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)), + segmentTimeUntilIrreparable: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_time_until_irreparable").WithTag("rs_scheme", rs)), + } +} + +func (stats *stats) collectAggregates() { + stats.objectsChecked.Observe(stats.iterationAggregates.objectsChecked) + stats.remoteSegmentsChecked.Observe(stats.iterationAggregates.remoteSegmentsChecked) + stats.remoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.remoteSegmentsNeedingRepair) + stats.newRemoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.newRemoteSegmentsNeedingRepair) + stats.remoteSegmentsLost.Observe(stats.iterationAggregates.remoteSegmentsLost) + stats.objectsLost.Observe(int64(len(stats.iterationAggregates.remoteSegmentInfo))) + stats.remoteSegmentsFailedToCheck.Observe(stats.iterationAggregates.remoteSegmentsFailedToCheck) + stats.remoteSegmentsOverThreshold1.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[0]) + stats.remoteSegmentsOverThreshold2.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[1]) + stats.remoteSegmentsOverThreshold3.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[2]) + stats.remoteSegmentsOverThreshold4.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[3]) + stats.remoteSegmentsOverThreshold5.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[4]) + + allUnhealthy := stats.iterationAggregates.remoteSegmentsNeedingRepair + stats.iterationAggregates.remoteSegmentsFailedToCheck + allChecked := stats.iterationAggregates.remoteSegmentsChecked + allHealthy := allChecked - allUnhealthy + + stats.remoteSegmentsHealthyPercentage.Observe(100 * float64(allHealthy) / float64(allChecked)) +} + +// Stats implements the monkit.StatSource interface. +func (stats *stats) Stats(cb func(key monkit.SeriesKey, field string, val float64)) { + stats.objectsChecked.Stats(cb) + stats.remoteSegmentsChecked.Stats(cb) + stats.remoteSegmentsNeedingRepair.Stats(cb) + stats.newRemoteSegmentsNeedingRepair.Stats(cb) + stats.remoteSegmentsLost.Stats(cb) + stats.objectsLost.Stats(cb) + stats.remoteSegmentsFailedToCheck.Stats(cb) + stats.remoteSegmentsOverThreshold1.Stats(cb) + stats.remoteSegmentsOverThreshold2.Stats(cb) + stats.remoteSegmentsOverThreshold3.Stats(cb) + stats.remoteSegmentsOverThreshold4.Stats(cb) + stats.remoteSegmentsOverThreshold5.Stats(cb) + stats.remoteSegmentsHealthyPercentage.Stats(cb) + stats.segmentsBelowMinReq.Stats(cb) + stats.segmentTotalCount.Stats(cb) + stats.segmentHealthyCount.Stats(cb) + stats.segmentAge.Stats(cb) + stats.segmentHealth.Stats(cb) + stats.injuredSegmentHealth.Stats(cb) + stats.segmentTimeUntilIrreparable.Stats(cb) +} + +func getRSString(min, repair, success, total int) string { + return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total) +} diff --git a/satellite/repair/checker/config.go b/satellite/repair/checker/config.go index efe4ff6a5..a800dfe22 100644 --- a/satellite/repair/checker/config.go +++ b/satellite/repair/checker/config.go @@ -143,7 +143,7 @@ func (ros *RepairOverrides) GetMap() RepairOverridesMap { overrideMap: make(map[string]int32), } for _, ro := range ros.List { - key := getRSKey(ro.Min, ro.Success, ro.Total) + key := getRepairOverrideKey(ro.Min, ro.Success, ro.Total) newMap.overrideMap[key] = ro.Override } return newMap @@ -158,16 +158,16 @@ type RepairOverridesMap struct { // GetOverrideValuePB returns the override value for a pb RS scheme if it exists, or 0 otherwise. func (rom *RepairOverridesMap) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 { - key := getRSKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total)) + key := getRepairOverrideKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total)) return rom.overrideMap[key] } // GetOverrideValue returns the override value for an RS scheme if it exists, or 0 otherwise. func (rom *RepairOverridesMap) GetOverrideValue(rs storj.RedundancyScheme) int32 { - key := getRSKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares)) + key := getRepairOverrideKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares)) return rom.overrideMap[key] } -func getRSKey(min, success, total int) string { +func getRepairOverrideKey(min, success, total int) string { return fmt.Sprintf("%d/%d/%d", min, success, total) } From 187680b0c1d86c4194e991609c36bd91482f263f Mon Sep 17 00:00:00 2001 From: Ivan Fraixedes Date: Tue, 15 Dec 2020 18:58:54 +0100 Subject: [PATCH 3/7] scripts: Fix typo in a comment Change-Id: If79e778e786db06d2263bcd5393f639a4bb92542 --- scripts/test-sim.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test-sim.sh b/scripts/test-sim.sh index ff5e78ad3..dbb629614 100755 --- a/scripts/test-sim.sh +++ b/scripts/test-sim.sh @@ -17,7 +17,7 @@ make -C "$SCRIPTDIR"/.. install-sim echo "Overriding default max segment size to 6MiB" GOBIN=$TMP go install -v -ldflags "-X 'storj.io/uplink.maxSegmentSize=6MiB'" storj.io/storj/cmd/uplink -# use modifed version of uplink +# use modified version of uplink export PATH=$TMP:$PATH export STORJ_NETWORK_DIR=$TMP From 8c52bb3a18217e81bc686cbb6ee5514f1fb2d743 Mon Sep 17 00:00:00 2001 From: Cameron Ayer Date: Tue, 15 Dec 2020 17:16:54 -0500 Subject: [PATCH 4/7] satellite/checker: use numHealthy as segment health in repair queue A few weeks ago it was discovered that the segment health function was not working as expected with production values. As a bandaid, we decided to insert the number of healthy pieces into the segment health column. This should have effectively reverted our means of prioritizing repair to the previous implementation. However, it turns out that the bandaid was placed into the code which removes items from the irreparable db and inserts them into the repair queue. This change: insert number of healthy pieces into the repair queue in the method, RemoteSegment Change-Id: Iabfc7984df0a928066b69e9aecb6f615253f1ad2 --- satellite/repair/checker/checker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index 04d1c1efe..f1c2a4f8d 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -332,7 +332,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo Path: key, LostPieces: missingPieces, InsertedTime: time.Now().UTC(), - }, segmentHealth) + }, float64(numHealthy)) if err != nil { obs.log.Error("error adding injured segment to queue", zap.Error(err)) return nil From 12055e7864b51e7c038663f27dcee19f16ef0df1 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 15 Dec 2020 13:45:19 +0200 Subject: [PATCH 5/7] all: minor cleanups Change-Id: I4248dbe36a62a223b06135254b32851485a2eec1 --- cmd/uplink/cmd/cp.go | 2 +- private/dbutil/cockroachutil/db.go | 4 +++- satellite/accounting/projectusage_test.go | 2 +- satellite/audit/verifier.go | 6 +++--- satellite/referrals/users.go | 15 ++++++--------- storagenode/bandwidth/db_test.go | 2 +- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/cmd/uplink/cmd/cp.go b/cmd/uplink/cmd/cp.go index bea9435c5..31e96222e 100644 --- a/cmd/uplink/cmd/cp.go +++ b/cmd/uplink/cmd/cp.go @@ -185,7 +185,7 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres } if fileInfo, err := os.Stat(dst.Path()); err == nil && fileInfo.IsDir() { - dst = dst.Join((src.Base())) + dst = dst.Join(src.Base()) } var file *os.File diff --git a/private/dbutil/cockroachutil/db.go b/private/dbutil/cockroachutil/db.go index f94e65926..9c3146260 100644 --- a/private/dbutil/cockroachutil/db.go +++ b/private/dbutil/cockroachutil/db.go @@ -13,6 +13,7 @@ import ( "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" + "storj.io/common/context2" "storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil/pgutil" "storj.io/storj/private/tagsql" @@ -56,7 +57,8 @@ func OpenUnique(ctx context.Context, connStr string, schemaPrefix string) (db *d } cleanup := func(cleanupDB tagsql.DB) error { - _, err := cleanupDB.Exec(context.TODO(), "DROP DATABASE "+pgutil.QuoteIdentifier(schemaName)) + ctx := context2.WithoutCancellation(ctx) + _, err := cleanupDB.Exec(ctx, "DROP DATABASE "+pgutil.QuoteIdentifier(schemaName)) return errs.Wrap(err) } diff --git a/satellite/accounting/projectusage_test.go b/satellite/accounting/projectusage_test.go index f3497ef07..dc236bf58 100644 --- a/satellite/accounting/projectusage_test.go +++ b/satellite/accounting/projectusage_test.go @@ -362,7 +362,7 @@ func TestUsageRollups(t *testing.T) { ) now := time.Now() - start := now.Add(tallyInterval * time.Duration(-tallyIntervals)) + start := now.Add(tallyInterval * -tallyIntervals) db := planet.Satellites[0].DB diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index f95d2ef1c..8a95ac0e5 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -94,7 +94,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ } return Report{}, err } - if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now()) { + if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) { verifier.log.Debug("segment expired before Verify") return Report{}, nil } @@ -368,7 +368,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report } return Report{}, err } - if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now()) { + if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) { verifier.log.Debug("Segment expired before Reverify") return Report{}, nil } @@ -402,7 +402,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } - if pendingPointer.ExpirationDate != (time.Time{}) && pendingPointer.ExpirationDate.Before(time.Now().UTC()) { + if !pendingPointer.ExpirationDate.IsZero() && pendingPointer.ExpirationDate.Before(time.Now().UTC()) { verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID)) ch <- result{nodeID: pending.NodeID, status: skipped} return diff --git a/satellite/referrals/users.go b/satellite/referrals/users.go index b556dc2fb..9da639d66 100644 --- a/satellite/referrals/users.go +++ b/satellite/referrals/users.go @@ -26,21 +26,18 @@ type CreateUser struct { // IsValid checks CreateUser validity and returns error describing whats wrong. func (user *CreateUser) IsValid() error { - var errors []error - - errors = append(errors, console.ValidateFullName(user.FullName)) - errors = append(errors, console.ValidatePassword(user.Password)) + var group errs.Group + group.Add(console.ValidateFullName(user.FullName)) + group.Add(console.ValidatePassword(user.Password)) // validate email _, err := mail.ParseAddress(user.Email) - errors = append(errors, err) + group.Add(err) if user.ReferralToken != "" { _, err := uuid.FromString(user.ReferralToken) - if err != nil { - errors = append(errors, err) - } + group.Add(err) } - return errs.Combine(errors...) + return group.Err() } diff --git a/storagenode/bandwidth/db_test.go b/storagenode/bandwidth/db_test.go index 60ccab9ad..0a2d1411f 100644 --- a/storagenode/bandwidth/db_test.go +++ b/storagenode/bandwidth/db_test.go @@ -395,7 +395,7 @@ func TestBandwidthDailyRollups(t *testing.T) { require.NoError(t, err) // last day add bandwidth that won't be rolled up - day := startDate.Add(time.Hour * 24 * time.Duration(days-1)) + day := startDate.Add(time.Hour * 24 * (days - 1)) for _, satellite := range satellites { usageRollup := &bandwidth.UsageRollup{ From 0e832337002488b58bebac7e5a9a4c25bae29b1a Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Tue, 15 Dec 2020 16:46:59 -0500 Subject: [PATCH 6/7] storj-sim: add node id to default access Change-Id: I59874fe8d73a832d04a5597c98d05971a74d2164 --- cmd/storj-sim/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/storj-sim/network.go b/cmd/storj-sim/network.go index acdb7140b..3d1456356 100644 --- a/cmd/storj-sim/network.go +++ b/cmd/storj-sim/network.go @@ -43,7 +43,7 @@ const ( ) var ( - defaultAccess = "17jgVrPRktsquJQFzpT533WHmZnF6QDkuv8w3Ry5XPzAkh3vj7D1dbJ5MatQRiyRE2ZEiA1Y6fYnhoWqr2n7VgycdXSUPz1QzhthBsHqGXCrRcjSp8RbbVE1VJqDej9nLgB5YDPh3Q5JrVjQeMe9saHAL5rE5tUYJAeynVdre8HeTJMXcwau5" + defaultAccess = "13GKzTN8PoLvMFuN9JDZxMhyKGACmdoZuYXYSRNZQqiDDwv2Jm1FjVuZRHvNZ4Eh1Ganzi4cNV5N3fNb17ycSYqJQAdAPSSyXM1KqSbDFqYTbZAN2LTgxKJVkrnKGCGd2a93sM9eKyhfoXrukPhYjfk2dUpRzsCPsAVFVT4Cm2v7RpjEiwN1L42z" ) const ( From 2fd7809e54611d12c5ed231f3a2cf2f286b06612 Mon Sep 17 00:00:00 2001 From: Qweder93 Date: Thu, 17 Dec 2020 01:35:44 +0200 Subject: [PATCH 7/7] storagenode/payout: stefanbenten satellite name added to payout history, satellites with no held history removed from list Change-Id: I96861058ccb9c8ce52698796c91b999eaec1f6e6 --- storagenode/console/consoleapi/payout_test.go | 11 ++--------- storagenode/payout/service.go | 5 +++++ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/storagenode/console/consoleapi/payout_test.go b/storagenode/console/consoleapi/payout_test.go index 5ccc2c71a..35a83e335 100644 --- a/storagenode/console/consoleapi/payout_test.go +++ b/storagenode/console/consoleapi/payout_test.go @@ -351,7 +351,7 @@ func TestHeldAmountApi(t *testing.T) { }) t.Run("test HeldbackHistory", func(t *testing.T) { - date := time.Now().UTC().AddDate(0, -2, 0) + date := time.Now().UTC().AddDate(0, -2, 0).Round(time.Minute) err = reputationDB.Store(context.Background(), reputation.Stats{ SatelliteID: satellite.ID(), JoinedAt: date, @@ -376,15 +376,8 @@ func TestHeldAmountApi(t *testing.T) { JoinedAt: date.Round(time.Minute), } - stefanID, err := storj.NodeIDFromString("118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW") - require.NoError(t, err) - - held2 := payout.SatelliteHeldHistory{ - SatelliteID: stefanID, - } - var periods []payout.SatelliteHeldHistory - periods = append(periods, held, held2) + periods = append(periods, held) expected, err := json.Marshal(periods) require.NoError(t, err) diff --git a/storagenode/payout/service.go b/storagenode/payout/service.go index b55f39e2b..9a41979a3 100644 --- a/storagenode/payout/service.go +++ b/storagenode/payout/service.go @@ -180,6 +180,10 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell return nil, ErrPayoutService.Wrap(err) } + if helds == nil { + continue + } + disposed, err := service.db.SatellitesDisposedHistory(ctx, satellitesIDs[i]) if err != nil { return nil, ErrPayoutService.Wrap(err) @@ -202,6 +206,7 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell history.TotalDisposed = disposed history.SatelliteID = satellitesIDs[i] + history.SatelliteName = "stefan-benten" if satellitesIDs[i] != service.stefanSatellite { url, err := service.trust.GetNodeURL(ctx, satellitesIDs[i])