Merge 'master' branch

Change-Id: I4a3e45a2a2cdacfd87d16b148cfb4c6671c20b15
This commit is contained in:
Michal Niewrzal 2020-12-17 13:16:48 +01:00
commit 2381ca2810
15 changed files with 491 additions and 187 deletions

View File

@ -43,7 +43,7 @@ const (
)
var (
defaultAccess = "17jgVrPRktsquJQFzpT533WHmZnF6QDkuv8w3Ry5XPzAkh3vj7D1dbJ5MatQRiyRE2ZEiA1Y6fYnhoWqr2n7VgycdXSUPz1QzhthBsHqGXCrRcjSp8RbbVE1VJqDej9nLgB5YDPh3Q5JrVjQeMe9saHAL5rE5tUYJAeynVdre8HeTJMXcwau5"
defaultAccess = "13GKzTN8PoLvMFuN9JDZxMhyKGACmdoZuYXYSRNZQqiDDwv2Jm1FjVuZRHvNZ4Eh1Ganzi4cNV5N3fNb17ycSYqJQAdAPSSyXM1KqSbDFqYTbZAN2LTgxKJVkrnKGCGd2a93sM9eKyhfoXrukPhYjfk2dUpRzsCPsAVFVT4Cm2v7RpjEiwN1L42z"
)
const (

View File

@ -153,71 +153,74 @@ func registerAccess(cmd *cobra.Command, args []string) (err error) {
if len(args) == 0 {
return errs.New("no access specified")
}
_, err = register(args[0], registerCfg.AuthService, registerCfg.Public)
return err
}
if registerCfg.AuthService == "" {
return errs.New("no auth service address provided")
func register(accessRaw, authService string, public bool) (accessKey string, err error) {
if authService == "" {
return "", errs.New("no auth service address provided")
}
accessRaw := args[0]
// try assuming that accessRaw is a named access
access, err := registerCfg.GetNamedAccess(accessRaw)
if err == nil && access != nil {
accessRaw, err = access.Serialize()
if err != nil {
return errs.New("error serializing named access '%s': %w", accessRaw, err)
return "", errs.New("error serializing named access '%s': %w", accessRaw, err)
}
}
postData, err := json.Marshal(map[string]interface{}{
"access_grant": accessRaw,
"public": registerCfg.Public,
"public": public,
})
if err != nil {
return errs.Wrap(err)
return accessKey, errs.Wrap(err)
}
resp, err := http.Post(fmt.Sprintf("%s/v1/access", registerCfg.AuthService), "application/json", bytes.NewReader(postData))
resp, err := http.Post(fmt.Sprintf("%s/v1/access", authService), "application/json", bytes.NewReader(postData))
if err != nil {
return err
return "", err
}
defer func() { err = errs.Combine(err, resp.Body.Close()) }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
return "", err
}
respBody := make(map[string]string)
if err := json.Unmarshal(body, &respBody); err != nil {
return errs.New("unexpected response from auth service: %s", string(body))
return "", errs.New("unexpected response from auth service: %s", string(body))
}
accessKey, ok := respBody["access_key_id"]
if !ok {
return errs.New("access_key_id missing in response")
return "", errs.New("access_key_id missing in response")
}
secretKey, ok := respBody["secret_key"]
if !ok {
return errs.New("secret_key missing in response")
return "", errs.New("secret_key missing in response")
}
fmt.Println("=========== CREDENTIALS =========================================================")
fmt.Println("========== CREDENTIALS ===================================================================")
fmt.Println("Access Key ID: ", accessKey)
fmt.Println("Secret Key: ", secretKey)
fmt.Println("Endpoint: ", respBody["endpoint"])
fmt.Println("Secret Key : ", secretKey)
fmt.Println("Endpoint : ", respBody["endpoint"])
// update AWS credential file if requested
if registerCfg.AWSProfile != "" {
credentialsPath, err := getAwsCredentialsPath()
if err != nil {
return err
return "", err
}
err = writeAWSCredentials(credentialsPath, registerCfg.AWSProfile, accessKey, secretKey)
if err != nil {
return err
return "", err
}
}
return nil
return accessKey, nil
}
// getAwsCredentialsPath returns the expected AWS credentials path.

View File

@ -185,7 +185,7 @@ func download(ctx context.Context, src fpath.FPath, dst fpath.FPath, showProgres
}
if fileInfo, err := os.Stat(dst.Path()); err == nil && fileInfo.IsDir() {
dst = dst.Join((src.Base()))
dst = dst.Join(src.Base())
}
var file *os.File

View File

@ -7,8 +7,10 @@ import (
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"text/tabwriter"
"time"
"github.com/spf13/cobra"
@ -31,7 +33,13 @@ var shareCfg struct {
NotAfter string `help:"disallow access after this time (e.g. '+2h', '2020-01-02T15:01:01-01:00')" basic-help:"true"`
AllowedPathPrefix []string `help:"whitelist of path prefixes to require, overrides the [allowed-path-prefix] arguments"`
ExportTo string `default:"" help:"path to export the shared access to" basic-help:"true"`
BaseURL string `default:"https://link.tardigradeshare.io" help:"the base url for link sharing"`
BaseURL string `default:"https://link.tardigradeshare.io" help:"the base url for link sharing" basic-help:"true"`
Register bool `default:"false" help:"if true, creates and registers access grant" basic-help:"true"`
URL bool `default:"false" help:"if true, returns a url for the shared path. implies --register and --public" basic-help:"true"`
DNS string `default:"" help:"specify your custom hostname. if set, returns dns settings for web hosting. implies --register and --public" basic-help:"true"`
AuthService string `default:"https://auth.tardigradeshare.io" help:"url for shared auth service" basic-help:"true"`
Public bool `default:"false" help:"if true, the access will be public. --dns and --url override this" basic-help:"true"`
// Share requires information about the current access
AccessConfig
@ -46,11 +54,195 @@ func init() {
Short: "Shares restricted access to objects.",
RunE: shareMain,
}
RootCmd.AddCommand(shareCmd)
process.Bind(shareCmd, &shareCfg, defaults, cfgstruct.ConfDir(getConfDir()))
}
func shareMain(cmd *cobra.Command, args []string) (err error) {
newAccessData, sharePrefixes, permission, err := createAccessGrant(args)
if err != nil {
return err
}
var accessKey string
if shareCfg.Register || shareCfg.URL || shareCfg.DNS != "" {
isPublic := (shareCfg.Public || shareCfg.URL || shareCfg.DNS != "")
accessKey, err = register(newAccessData, shareCfg.AuthService, isPublic)
if err != nil {
return err
}
fmt.Println("Public Access: ", isPublic)
if len(shareCfg.AllowedPathPrefix) == 1 && !permission.AllowUpload && !permission.AllowDelete {
if shareCfg.URL {
if err = createURL(accessKey, sharePrefixes); err != nil {
return err
}
}
if shareCfg.DNS != "" {
if err = createDNS(accessKey); err != nil {
return err
}
}
}
}
if shareCfg.ExportTo != "" {
// convert to an absolute path, mostly for output purposes.
exportTo, err := filepath.Abs(shareCfg.ExportTo)
if err != nil {
return Error.Wrap(err)
}
if err := ioutil.WriteFile(exportTo, []byte(newAccessData+"\n"), 0600); err != nil {
return Error.Wrap(err)
}
fmt.Println("Exported to:", exportTo)
}
return nil
}
// Creates access grant for allowed path prefixes.
func createAccessGrant(args []string) (newAccessData string, sharePrefixes []sharePrefixExtension, permission uplink.Permission, err error) {
now := time.Now()
notBefore, err := parseHumanDate(shareCfg.NotBefore, now)
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
notAfter, err := parseHumanDate(shareCfg.NotAfter, now)
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
if len(shareCfg.AllowedPathPrefix) == 0 {
// if the --allowed-path-prefix flag is not set,
// use any arguments as allowed path prefixes
for _, arg := range args {
shareCfg.AllowedPathPrefix = append(shareCfg.AllowedPathPrefix, strings.Split(arg, ",")...)
}
}
var uplinkSharePrefixes []uplink.SharePrefix
for _, path := range shareCfg.AllowedPathPrefix {
p, err := fpath.New(path)
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
if p.IsLocal() {
return newAccessData, sharePrefixes, permission, errs.New("required path must be remote: %q", path)
}
uplinkSharePrefix := uplink.SharePrefix{
Bucket: p.Bucket(),
Prefix: p.Path(),
}
sharePrefixes = append(sharePrefixes, sharePrefixExtension{
uplinkSharePrefix: uplinkSharePrefix,
hasTrailingSlash: strings.HasSuffix(path, "/"),
})
uplinkSharePrefixes = append(uplinkSharePrefixes, uplinkSharePrefix)
}
access, err := shareCfg.GetAccess()
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
permission = uplink.Permission{}
permission.AllowDelete = !shareCfg.DisallowDeletes && !shareCfg.Readonly
permission.AllowList = !shareCfg.DisallowLists && !shareCfg.Writeonly
permission.AllowDownload = !shareCfg.DisallowReads && !shareCfg.Writeonly
permission.AllowUpload = !shareCfg.DisallowWrites && !shareCfg.Readonly
permission.NotBefore = notBefore
permission.NotAfter = notAfter
newAccess, err := access.Share(permission, uplinkSharePrefixes...)
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
newAccessData, err = newAccess.Serialize()
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
satelliteAddr, _, _, err := parseAccess(newAccessData)
if err != nil {
return newAccessData, sharePrefixes, permission, err
}
fmt.Println("Sharing access to satellite", satelliteAddr)
fmt.Println("=========== ACCESS RESTRICTIONS ==========================================================")
fmt.Println("Download :", formatPermission(permission.AllowDownload))
fmt.Println("Upload :", formatPermission(permission.AllowUpload))
fmt.Println("Lists :", formatPermission(permission.AllowList))
fmt.Println("Deletes :", formatPermission(permission.AllowDelete))
fmt.Println("NotBefore :", formatTimeRestriction(permission.NotBefore))
fmt.Println("NotAfter :", formatTimeRestriction(permission.NotAfter))
fmt.Println("Paths :", formatPaths(sharePrefixes))
fmt.Println("=========== SERIALIZED ACCESS WITH THE ABOVE RESTRICTIONS TO SHARE WITH OTHERS ===========")
fmt.Println("Access :", newAccessData)
return newAccessData, sharePrefixes, permission, nil
}
// Creates linksharing url for allowed path prefixes.
func createURL(newAccessData string, sharePrefixes []sharePrefixExtension) (err error) {
p, err := fpath.New(shareCfg.AllowedPathPrefix[0])
if err != nil {
return err
}
fmt.Println("=========== BROWSER URL ==================================================================")
fmt.Println("REMINDER : Object key must end in '/' when trying to share recursively")
var printFormat string
if p.Path() == "" || !sharePrefixes[0].hasTrailingSlash { // Check if the path is empty (aka sharing the entire bucket) or the path is not a directory or an object that ends in "/".
printFormat = "URL : %s/%s/%s/%s\n"
} else {
printFormat = "URL : %s/%s/%s/%s/\n"
}
fmt.Printf(printFormat, shareCfg.BaseURL, url.PathEscape(newAccessData), p.Bucket(), p.Path())
return nil
}
// Creates dns record info for allowed path prefixes.
func createDNS(accessKey string) (err error) {
p, err := fpath.New(shareCfg.AllowedPathPrefix[0])
if err != nil {
return err
}
CNAME, err := url.Parse(shareCfg.BaseURL)
if err != nil {
return err
}
minWidth := len(shareCfg.DNS) + 5 // add 5 spaces to account for "txt-"
w := new(tabwriter.Writer)
w.Init(os.Stdout, minWidth, minWidth, 0, '\t', 0)
defer func() {
err = errs.Combine(err, w.Flush())
}()
var printStorjRoot string
if p.Path() == "" {
printStorjRoot = fmt.Sprintf("txt-%s\tIN\tTXT \tstorj-root:%s", shareCfg.DNS, p.Bucket())
} else {
printStorjRoot = fmt.Sprintf("txt-%s\tIN\tTXT \tstorj-root:%s/%s", shareCfg.DNS, p.Bucket(), p.Path())
}
fmt.Println("=========== DNS INFO =====================================================================")
fmt.Println("Remember to update the $ORIGIN with your domain name. You may also change the $TTL.")
fmt.Fprintln(w, "$ORIGIN example.com.")
fmt.Fprintln(w, "$TTL 3600")
fmt.Fprintf(w, "%s \tIN\tCNAME\t%s.\n", shareCfg.DNS, CNAME.Host)
fmt.Fprintln(w, printStorjRoot)
fmt.Fprintf(w, "txt-%s\tIN\tTXT \tstorj-access:%s\n", shareCfg.DNS, accessKey)
return nil
}
func parseHumanDate(date string, now time.Time) (time.Time, error) {
switch {
case date == "":
@ -71,105 +263,10 @@ func parseHumanDate(date string, now time.Time) (time.Time, error) {
}
}
// shareMain is the function executed when shareCmd is called.
func shareMain(cmd *cobra.Command, args []string) (err error) {
now := time.Now()
notBefore, err := parseHumanDate(shareCfg.NotBefore, now)
if err != nil {
return err
}
notAfter, err := parseHumanDate(shareCfg.NotAfter, now)
if err != nil {
return err
}
if len(shareCfg.AllowedPathPrefix) == 0 {
// if the --allowed-path-prefix flag is not set,
// use any arguments as allowed path prefixes
for _, arg := range args {
shareCfg.AllowedPathPrefix = append(shareCfg.AllowedPathPrefix, strings.Split(arg, ",")...)
}
}
var sharePrefixes []uplink.SharePrefix
for _, path := range shareCfg.AllowedPathPrefix {
p, err := fpath.New(path)
if err != nil {
return err
}
if p.IsLocal() {
return errs.New("required path must be remote: %q", path)
}
sharePrefixes = append(sharePrefixes, uplink.SharePrefix{
Bucket: p.Bucket(),
Prefix: p.Path(),
})
}
access, err := shareCfg.GetAccess()
if err != nil {
return err
}
permission := uplink.Permission{}
permission.AllowDelete = !shareCfg.DisallowDeletes && !shareCfg.Readonly
permission.AllowList = !shareCfg.DisallowLists && !shareCfg.Writeonly
permission.AllowDownload = !shareCfg.DisallowReads && !shareCfg.Writeonly
permission.AllowUpload = !shareCfg.DisallowWrites && !shareCfg.Readonly
permission.NotBefore = notBefore
permission.NotAfter = notAfter
newAccess, err := access.Share(permission, sharePrefixes...)
if err != nil {
return err
}
newAccessData, err := newAccess.Serialize()
if err != nil {
return err
}
satelliteAddr, _, _, err := parseAccess(newAccessData)
if err != nil {
return err
}
fmt.Println("Sharing access to satellite", satelliteAddr)
fmt.Println("=========== ACCESS RESTRICTIONS ==========================================================")
fmt.Println("Download :", formatPermission(permission.AllowDownload))
fmt.Println("Upload :", formatPermission(permission.AllowUpload))
fmt.Println("Lists :", formatPermission(permission.AllowList))
fmt.Println("Deletes :", formatPermission(permission.AllowDelete))
fmt.Println("NotBefore :", formatTimeRestriction(permission.NotBefore))
fmt.Println("NotAfter :", formatTimeRestriction(permission.NotAfter))
fmt.Println("Paths :", formatPaths(sharePrefixes))
fmt.Println("=========== SERIALIZED ACCESS WITH THE ABOVE RESTRICTIONS TO SHARE WITH OTHERS ===========")
fmt.Println("Access :", newAccessData)
if len(shareCfg.AllowedPathPrefix) == 1 && !permission.AllowUpload && !permission.AllowDelete {
fmt.Println("=========== BROWSER URL ==================================================================")
p, err := fpath.New(shareCfg.AllowedPathPrefix[0])
if err != nil {
return err
}
fmt.Println("URL :", fmt.Sprintf("%s/%s/%s/%s", shareCfg.BaseURL,
url.PathEscape(newAccessData),
url.PathEscape(p.Bucket()),
url.PathEscape(p.Path())))
}
if shareCfg.ExportTo != "" {
// convert to an absolute path, mostly for output purposes.
exportTo, err := filepath.Abs(shareCfg.ExportTo)
if err != nil {
return Error.Wrap(err)
}
if err := ioutil.WriteFile(exportTo, []byte(newAccessData+"\n"), 0600); err != nil {
return Error.Wrap(err)
}
fmt.Println("Exported to:", exportTo)
}
return nil
// sharePrefixExtension is a temporary struct type. We might want to add hasTrailingSlash bool to `uplink.SharePrefix` directly.
type sharePrefixExtension struct {
uplinkSharePrefix uplink.SharePrefix
hasTrailingSlash bool
}
func formatPermission(allowed bool) string {
@ -186,19 +283,23 @@ func formatTimeRestriction(t time.Time) string {
return formatTime(t)
}
func formatPaths(sharePrefixes []uplink.SharePrefix) string {
func formatPaths(sharePrefixes []sharePrefixExtension) string {
if len(sharePrefixes) == 0 {
return "WARNING! The entire project is shared!"
}
var paths []string
for _, prefix := range sharePrefixes {
path := "sj://" + prefix.Bucket
if len(prefix.Prefix) == 0 {
path += " (entire bucket)"
path := "sj://" + prefix.uplinkSharePrefix.Bucket
if len(prefix.uplinkSharePrefix.Prefix) == 0 {
path += "/ (entire bucket)"
} else {
path += "/" + prefix.Prefix
path += "/" + prefix.uplinkSharePrefix.Prefix
if prefix.hasTrailingSlash {
path += "/"
}
}
paths = append(paths, path)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"storj.io/common/context2"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/tagsql"
@ -56,7 +57,8 @@ func OpenUnique(ctx context.Context, connStr string, schemaPrefix string) (db *d
}
cleanup := func(cleanupDB tagsql.DB) error {
_, err := cleanupDB.Exec(context.TODO(), "DROP DATABASE "+pgutil.QuoteIdentifier(schemaName))
ctx := context2.WithoutCancellation(ctx)
_, err := cleanupDB.Exec(ctx, "DROP DATABASE "+pgutil.QuoteIdentifier(schemaName))
return errs.Wrap(err)
}

View File

@ -362,7 +362,7 @@ func TestUsageRollups(t *testing.T) {
)
now := time.Now()
start := now.Add(tallyInterval * time.Duration(-tallyIntervals))
start := now.Add(tallyInterval * -tallyIntervals)
db := planet.Satellites[0].DB

View File

@ -94,7 +94,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
}
return Report{}, err
}
if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now()) {
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) {
verifier.log.Debug("segment expired before Verify")
return Report{}, nil
}
@ -368,7 +368,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
}
return Report{}, err
}
if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now()) {
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now()) {
verifier.log.Debug("Segment expired before Reverify")
return Report{}, nil
}
@ -402,7 +402,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if pendingPointer.ExpirationDate != (time.Time{}) && pendingPointer.ExpirationDate.Before(time.Now().UTC()) {
if !pendingPointer.ExpirationDate.IsZero() && pendingPointer.ExpirationDate.Before(time.Now().UTC()) {
verifier.log.Debug("Reverify: segment already expired", zap.Stringer("Node ID", pending.NodeID))
ch <- result{nodeID: pending.NodeID, status: skipped}
return

View File

@ -26,21 +26,18 @@ type CreateUser struct {
// IsValid checks CreateUser validity and returns error describing whats wrong.
func (user *CreateUser) IsValid() error {
var errors []error
errors = append(errors, console.ValidateFullName(user.FullName))
errors = append(errors, console.ValidatePassword(user.Password))
var group errs.Group
group.Add(console.ValidateFullName(user.FullName))
group.Add(console.ValidatePassword(user.Password))
// validate email
_, err := mail.ParseAddress(user.Email)
errors = append(errors, err)
group.Add(err)
if user.ReferralToken != "" {
_, err := uuid.FromString(user.ReferralToken)
if err != nil {
errors = append(errors, err)
}
group.Add(err)
}
return errs.Combine(errors...)
return group.Err()
}

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metainfo"
@ -30,19 +31,6 @@ var (
mon = monkit.Package()
)
// durabilityStats remote segment information.
type durabilityStats struct {
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
newRemoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
remoteSegmentInfo []metabase.ObjectLocation
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold [5]int64
}
// Checker contains the information needed to do checks for missing pieces.
//
// architecture: Chore
@ -52,6 +40,7 @@ type Checker struct {
irrdb irreparable.DB
metaLoop *metainfo.Loop
nodestate *ReliabilityCache
statsCollector *statsCollector
repairOverrides RepairOverridesMap
nodeFailureRate float64
Loop *sync2.Cycle
@ -67,6 +56,7 @@ func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irrepar
irrdb: irrdb,
metaLoop: metaLoop,
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
statsCollector: newStatsCollector(),
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
@ -113,7 +103,8 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
repairQueue: checker.repairQueue,
irrdb: checker.irrdb,
nodestate: checker.nodestate,
monStats: durabilityStats{},
statsCollector: checker.statsCollector,
monStats: aggregateStats{},
repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate,
log: checker.logger,
@ -132,6 +123,8 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
return Error.Wrap(err)
}
checker.statsCollector.collectAggregates()
mon.IntVal("remote_files_checked").Observe(observer.monStats.objectsChecked) //mon:locked
mon.IntVal("remote_segments_checked").Observe(observer.monStats.remoteSegmentsChecked) //mon:locked
mon.IntVal("remote_segments_failed_to_check").Observe(observer.monStats.remoteSegmentsFailedToCheck) //mon:locked
@ -248,12 +241,27 @@ type checkerObserver struct {
repairQueue queue.RepairQueue
irrdb irreparable.DB
nodestate *ReliabilityCache
monStats durabilityStats
statsCollector *statsCollector
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
repairOverrides RepairOverridesMap
nodeFailureRate float64
log *zap.Logger
}
func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats {
rsString := getRSString(obs.loadRedundancy(redundancy))
return obs.statsCollector.getStatsByRS(rsString)
}
func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (int, int, int, int) {
repair := int(redundancy.RepairShares)
overrideValue := obs.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repair = int(overrideValue)
}
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
@ -262,10 +270,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
return nil
}
stats := obs.getStatsByRS(segment.Redundancy)
obs.monStats.remoteSegmentsChecked++
stats.iterationAggregates.remoteSegmentsChecked++
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Counter("checker_segments_below_min_req").Inc(0) //mon:locked
stats.segmentsBelowMinReq.Inc(0)
pieces := segment.Pieces
if len(pieces) == 0 {
@ -285,26 +297,25 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces)
if err != nil {
obs.monStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error getting missing pieces"), err)
}
numHealthy := len(pieces) - len(missingPieces)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked
stats.segmentTotalCount.Observe(int64(len(pieces)))
mon.IntVal("checker_segment_healthy_count").Observe(int64(numHealthy)) //mon:locked
stats.segmentHealthyCount.Observe(int64(numHealthy))
segmentAge := time.Since(segment.CreationDate)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentAge.Observe(int64(segmentAge.Seconds()))
required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy)
redundancy := segment.Redundancy
required := int(redundancy.RequiredShares)
repairThreshold := int(redundancy.RepairShares)
overrideValue := obs.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repairThreshold = int(overrideValue)
}
successThreshold := int(redundancy.OptimalShares)
segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentHealth.Observe(segmentHealth)
key := segment.Location.Encode()
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
@ -312,12 +323,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
if numHealthy >= required && numHealthy <= repairThreshold && numHealthy < successThreshold {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.injuredSegmentHealth.Observe(segmentHealth)
obs.monStats.remoteSegmentsNeedingRepair++
stats.iterationAggregates.remoteSegmentsNeedingRepair++
alreadyInserted, err := obs.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key,
LostPieces: missingPieces,
InsertedTime: time.Now().UTC(),
}, segmentHealth)
}, float64(numHealthy))
if err != nil {
obs.log.Error("error adding injured segment to queue", zap.Error(err))
return nil
@ -325,6 +338,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
if !alreadyInserted {
obs.monStats.newRemoteSegmentsNeedingRepair++
stats.iterationAggregates.newRemoteSegmentsNeedingRepair++
}
// delete always returns nil when something was deleted and also when element didn't exists
@ -338,6 +352,9 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
if !containsObjectLocation(obs.monStats.remoteSegmentInfo, lostSegInfo) {
obs.monStats.remoteSegmentInfo = append(obs.monStats.remoteSegmentInfo, lostSegInfo)
}
if !containsObjectLocation(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo) {
stats.iterationAggregates.remoteSegmentInfo = append(stats.iterationAggregates.remoteSegmentInfo, lostSegInfo)
}
var segmentAge time.Duration
if segment.CreationDate.Before(segment.LastRepaired) {
@ -346,9 +363,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
segmentAge = time.Since(segment.CreationDate)
}
mon.IntVal("checker_segment_time_until_irreparable").Observe(int64(segmentAge.Seconds())) //mon:locked
stats.segmentTimeUntilIrreparable.Observe(int64(segmentAge.Seconds()))
obs.monStats.remoteSegmentsLost++
stats.iterationAggregates.remoteSegmentsLost++
mon.Counter("checker_segments_below_min_req").Inc(1) //mon:locked
stats.segmentsBelowMinReq.Inc(1)
// make an entry into the irreparable table
segmentInfo := &internalpb.IrreparableSegment{
Path: key,
@ -364,13 +386,25 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
obs.log.Error("error handling irreparable segment to queue", zap.Error(err))
return nil
}
} else if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(obs.monStats.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range obs.monStats.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
obs.monStats.remoteSegmentsOverThreshold[i]++
break
} else {
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(obs.monStats.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range obs.monStats.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
obs.monStats.remoteSegmentsOverThreshold[i]++
break
}
}
}
if numHealthy > repairThreshold && numHealthy <= (repairThreshold+len(stats.iterationAggregates.remoteSegmentsOverThreshold)) {
// record metrics for segments right above repair threshold
// numHealthy=repairThreshold+1 through numHealthy=repairThreshold+5
for i := range stats.iterationAggregates.remoteSegmentsOverThreshold {
if numHealthy == (repairThreshold + i + 1) {
stats.iterationAggregates.remoteSegmentsOverThreshold[i]++
break
}
}
}
}
@ -383,6 +417,9 @@ func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object)
obs.monStats.objectsChecked++
stats := obs.getStatsByRS(object.LastSegment.Redundancy)
stats.iterationAggregates.objectsChecked++
return nil
}

View File

@ -0,0 +1,166 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"fmt"
"github.com/spacemonkeygo/monkit/v3"
"storj.io/storj/satellite/metainfo/metabase"
)
// statsCollector holds a *stats for each redundancy scheme
// seen by the checker. These are chained into the monkit scope for
// monitoring as they are initialized.
type statsCollector struct {
stats map[string]*stats
}
func newStatsCollector() *statsCollector {
return &statsCollector{
stats: make(map[string]*stats),
}
}
func (collector *statsCollector) getStatsByRS(rs string) *stats {
stats, ok := collector.stats[rs]
if !ok {
stats = newStats(rs)
mon.Chain(stats)
collector.stats[rs] = stats
}
return stats
}
// collectAggregates transfers the iteration aggregates into the
// respective stats monkit metrics at the end of each checker iteration.
// iterationAggregates is then cleared.
func (collector *statsCollector) collectAggregates() {
for _, stats := range collector.stats {
stats.collectAggregates()
stats.iterationAggregates = new(aggregateStats)
}
}
// stats is used for collecting and reporting checker metrics.
//
// add any new metrics tagged with rs_scheme to this struct and set them
// in newStats.
type stats struct {
iterationAggregates *aggregateStats
objectsChecked *monkit.IntVal
remoteSegmentsChecked *monkit.IntVal
remoteSegmentsNeedingRepair *monkit.IntVal
newRemoteSegmentsNeedingRepair *monkit.IntVal
remoteSegmentsLost *monkit.IntVal
objectsLost *monkit.IntVal
remoteSegmentsFailedToCheck *monkit.IntVal
remoteSegmentsHealthyPercentage *monkit.FloatVal
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold1 *monkit.IntVal
remoteSegmentsOverThreshold2 *monkit.IntVal
remoteSegmentsOverThreshold3 *monkit.IntVal
remoteSegmentsOverThreshold4 *monkit.IntVal
remoteSegmentsOverThreshold5 *monkit.IntVal
segmentsBelowMinReq *monkit.Counter
segmentTotalCount *monkit.IntVal
segmentHealthyCount *monkit.IntVal
segmentAge *monkit.IntVal
segmentHealth *monkit.FloatVal
injuredSegmentHealth *monkit.FloatVal
segmentTimeUntilIrreparable *monkit.IntVal
}
// aggregateStats tallies data over the full checker iteration.
type aggregateStats struct {
objectsChecked int64
remoteSegmentsChecked int64
remoteSegmentsNeedingRepair int64
newRemoteSegmentsNeedingRepair int64
remoteSegmentsLost int64
remoteSegmentsFailedToCheck int64
remoteSegmentInfo []metabase.ObjectLocation
// remoteSegmentsOverThreshold[0]=# of healthy=rt+1, remoteSegmentsOverThreshold[1]=# of healthy=rt+2, etc...
remoteSegmentsOverThreshold [5]int64
}
func newStats(rs string) *stats {
return &stats{
iterationAggregates: new(aggregateStats),
objectsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_objects_checked").WithTag("rs_scheme", rs)),
remoteSegmentsChecked: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_checked").WithTag("rs_scheme", rs)),
remoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_needing_repair").WithTag("rs_scheme", rs)),
newRemoteSegmentsNeedingRepair: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "new_remote_segments_needing_repair").WithTag("rs_scheme", rs)),
remoteSegmentsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_lost").WithTag("rs_scheme", rs)),
objectsLost: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "objects_lost").WithTag("rs_scheme", rs)),
remoteSegmentsFailedToCheck: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_failed_to_check").WithTag("rs_scheme", rs)),
remoteSegmentsHealthyPercentage: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_healthy_percentage").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold1: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_1").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold2: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_2").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold3: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_3").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold4: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_4").WithTag("rs_scheme", rs)),
remoteSegmentsOverThreshold5: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "remote_segments_over_threshold_5").WithTag("rs_scheme", rs)),
segmentsBelowMinReq: monkit.NewCounter(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segments_below_min_req").WithTag("rs_scheme", rs)),
segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)),
segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)),
segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)),
segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)),
injuredSegmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_injured_segment_health").WithTag("rs_scheme", rs)),
segmentTimeUntilIrreparable: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_time_until_irreparable").WithTag("rs_scheme", rs)),
}
}
func (stats *stats) collectAggregates() {
stats.objectsChecked.Observe(stats.iterationAggregates.objectsChecked)
stats.remoteSegmentsChecked.Observe(stats.iterationAggregates.remoteSegmentsChecked)
stats.remoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.remoteSegmentsNeedingRepair)
stats.newRemoteSegmentsNeedingRepair.Observe(stats.iterationAggregates.newRemoteSegmentsNeedingRepair)
stats.remoteSegmentsLost.Observe(stats.iterationAggregates.remoteSegmentsLost)
stats.objectsLost.Observe(int64(len(stats.iterationAggregates.remoteSegmentInfo)))
stats.remoteSegmentsFailedToCheck.Observe(stats.iterationAggregates.remoteSegmentsFailedToCheck)
stats.remoteSegmentsOverThreshold1.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[0])
stats.remoteSegmentsOverThreshold2.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[1])
stats.remoteSegmentsOverThreshold3.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[2])
stats.remoteSegmentsOverThreshold4.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[3])
stats.remoteSegmentsOverThreshold5.Observe(stats.iterationAggregates.remoteSegmentsOverThreshold[4])
allUnhealthy := stats.iterationAggregates.remoteSegmentsNeedingRepair + stats.iterationAggregates.remoteSegmentsFailedToCheck
allChecked := stats.iterationAggregates.remoteSegmentsChecked
allHealthy := allChecked - allUnhealthy
stats.remoteSegmentsHealthyPercentage.Observe(100 * float64(allHealthy) / float64(allChecked))
}
// Stats implements the monkit.StatSource interface.
func (stats *stats) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
stats.objectsChecked.Stats(cb)
stats.remoteSegmentsChecked.Stats(cb)
stats.remoteSegmentsNeedingRepair.Stats(cb)
stats.newRemoteSegmentsNeedingRepair.Stats(cb)
stats.remoteSegmentsLost.Stats(cb)
stats.objectsLost.Stats(cb)
stats.remoteSegmentsFailedToCheck.Stats(cb)
stats.remoteSegmentsOverThreshold1.Stats(cb)
stats.remoteSegmentsOverThreshold2.Stats(cb)
stats.remoteSegmentsOverThreshold3.Stats(cb)
stats.remoteSegmentsOverThreshold4.Stats(cb)
stats.remoteSegmentsOverThreshold5.Stats(cb)
stats.remoteSegmentsHealthyPercentage.Stats(cb)
stats.segmentsBelowMinReq.Stats(cb)
stats.segmentTotalCount.Stats(cb)
stats.segmentHealthyCount.Stats(cb)
stats.segmentAge.Stats(cb)
stats.segmentHealth.Stats(cb)
stats.injuredSegmentHealth.Stats(cb)
stats.segmentTimeUntilIrreparable.Stats(cb)
}
func getRSString(min, repair, success, total int) string {
return fmt.Sprintf("%d/%d/%d/%d", min, repair, success, total)
}

View File

@ -143,7 +143,7 @@ func (ros *RepairOverrides) GetMap() RepairOverridesMap {
overrideMap: make(map[string]int32),
}
for _, ro := range ros.List {
key := getRSKey(ro.Min, ro.Success, ro.Total)
key := getRepairOverrideKey(ro.Min, ro.Success, ro.Total)
newMap.overrideMap[key] = ro.Override
}
return newMap
@ -158,16 +158,16 @@ type RepairOverridesMap struct {
// GetOverrideValuePB returns the override value for a pb RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
key := getRSKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total))
key := getRepairOverrideKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total))
return rom.overrideMap[key]
}
// GetOverrideValue returns the override value for an RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValue(rs storj.RedundancyScheme) int32 {
key := getRSKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares))
key := getRepairOverrideKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares))
return rom.overrideMap[key]
}
func getRSKey(min, success, total int) string {
func getRepairOverrideKey(min, success, total int) string {
return fmt.Sprintf("%d/%d/%d", min, success, total)
}

View File

@ -17,7 +17,7 @@ make -C "$SCRIPTDIR"/.. install-sim
echo "Overriding default max segment size to 6MiB"
GOBIN=$TMP go install -v -ldflags "-X 'storj.io/uplink.maxSegmentSize=6MiB'" storj.io/storj/cmd/uplink
# use modifed version of uplink
# use modified version of uplink
export PATH=$TMP:$PATH
export STORJ_NETWORK_DIR=$TMP

View File

@ -395,7 +395,7 @@ func TestBandwidthDailyRollups(t *testing.T) {
require.NoError(t, err)
// last day add bandwidth that won't be rolled up
day := startDate.Add(time.Hour * 24 * time.Duration(days-1))
day := startDate.Add(time.Hour * 24 * (days - 1))
for _, satellite := range satellites {
usageRollup := &bandwidth.UsageRollup{

View File

@ -351,7 +351,7 @@ func TestHeldAmountApi(t *testing.T) {
})
t.Run("test HeldbackHistory", func(t *testing.T) {
date := time.Now().UTC().AddDate(0, -2, 0)
date := time.Now().UTC().AddDate(0, -2, 0).Round(time.Minute)
err = reputationDB.Store(context.Background(), reputation.Stats{
SatelliteID: satellite.ID(),
JoinedAt: date,
@ -376,15 +376,8 @@ func TestHeldAmountApi(t *testing.T) {
JoinedAt: date.Round(time.Minute),
}
stefanID, err := storj.NodeIDFromString("118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW")
require.NoError(t, err)
held2 := payout.SatelliteHeldHistory{
SatelliteID: stefanID,
}
var periods []payout.SatelliteHeldHistory
periods = append(periods, held, held2)
periods = append(periods, held)
expected, err := json.Marshal(periods)
require.NoError(t, err)

View File

@ -180,6 +180,10 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell
return nil, ErrPayoutService.Wrap(err)
}
if helds == nil {
continue
}
disposed, err := service.db.SatellitesDisposedHistory(ctx, satellitesIDs[i])
if err != nil {
return nil, ErrPayoutService.Wrap(err)
@ -202,6 +206,7 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []Satell
history.TotalDisposed = disposed
history.SatelliteID = satellitesIDs[i]
history.SatelliteName = "stefan-benten"
if satellitesIDs[i] != service.stefanSatellite {
url, err := service.trust.GetNodeURL(ctx, satellitesIDs[i])