diff --git a/versioncontrol/peer.go b/versioncontrol/peer.go index da0e4c1ac..8441094a9 100644 --- a/versioncontrol/peer.go +++ b/versioncontrol/peer.go @@ -13,6 +13,8 @@ import ( "net/http" "reflect" "strings" + "sync" + "time" "github.com/gorilla/mux" "github.com/zeebo/errs" @@ -20,6 +22,7 @@ import ( "golang.org/x/sync/errgroup" "storj.io/common/errs2" + "storj.io/common/sync2" "storj.io/private/version" ) @@ -35,7 +38,10 @@ var ( // Config is all the configuration parameters for a Version Control Server. type Config struct { - Address string `user:"true" help:"public address to listen on" default:":8080"` + Address string `user:"true" help:"public address to listen on" default:":8080"` + SafeRate float64 `user:"true" help:"the safe daily fractional increase for a rollout (a value of .5 means 0 to 50% in 24 hours). 0 means immediate rollout." default:".2"` + RegenInterval time.Duration `user:"true" help:"how long to go between recalculating the current cursors. 0 means on demand." default:"5m"` + Versions OldVersionConfig Binary ProcessesConfig @@ -77,8 +83,16 @@ type VersionConfig struct { // RolloutConfig represents the state of a version rollout configuration of a process. type RolloutConfig struct { - Seed string `user:"true" help:"random 32 byte, hex-encoded string"` - Cursor int `user:"true" help:"percentage of nodes which should roll-out to the suggested version" default:"0"` + Seed string `user:"true" help:"random 32 byte, hex-encoded string"` + PreviousCursor int `user:"true" help:"prior configuration's cursor value. if 100%, will be capped at the current cursor." default:"100"` + Cursor int `user:"true" help:"percentage of nodes which should roll-out to the suggested version" default:"0"` +} + +// response invariant: the struct or its data is never modified after creation. +type response struct { + versions version.AllowedVersions + // serialized contains the byte version of current allowed versions. + serialized []byte } // Peer is the representation of a VersionControl Server. @@ -94,10 +108,13 @@ type Peer struct { Listener net.Listener } - Versions version.AllowedVersions + config Config + initTime time.Time - // response contains the byte version of current allowed versions - response []byte + regenLoop *sync2.Cycle + + mu sync.Mutex + response *response } // New creates a new VersionControl Server. @@ -107,73 +124,17 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) { } peer = &Peer{ - Log: log, + Log: log, + config: *config, + initTime: time.Now(), + regenLoop: sync2.NewCycle(config.RegenInterval), } - // Convert each Service's VersionConfig String to SemVer - peer.Versions.Satellite, err = version.NewOldSemVer(config.Versions.Satellite) + err = peer.updateResponse() if err != nil { - return &Peer{}, err + return nil, err } - peer.Versions.Storagenode, err = version.NewOldSemVer(config.Versions.Storagenode) - if err != nil { - return &Peer{}, err - } - - peer.Versions.Uplink, err = version.NewOldSemVer(config.Versions.Uplink) - if err != nil { - return &Peer{}, err - } - - peer.Versions.Gateway, err = version.NewOldSemVer(config.Versions.Gateway) - if err != nil { - return &Peer{}, err - } - - peer.Versions.Identity, err = version.NewOldSemVer(config.Versions.Identity) - if err != nil { - return &Peer{}, err - } - - peer.Versions.Processes.Satellite, err = configToProcess(config.Binary.Satellite) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.Versions.Processes.Storagenode, err = configToProcess(config.Binary.Storagenode) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.Versions.Processes.StoragenodeUpdater, err = configToProcess(config.Binary.StoragenodeUpdater) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.Versions.Processes.Uplink, err = configToProcess(config.Binary.Uplink) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.Versions.Processes.Gateway, err = configToProcess(config.Binary.Gateway) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.Versions.Processes.Identity, err = configToProcess(config.Binary.Identity) - if err != nil { - return nil, RolloutErr.Wrap(err) - } - - peer.response, err = json.Marshal(peer.Versions) - if err != nil { - peer.Log.Error("Error marshalling version info.", zap.Error(err)) - return nil, RolloutErr.Wrap(err) - } - - peer.Log.Debug("Setting version info.", zap.ByteString("Value", peer.response)) - { router := mux.NewRouter() router.HandleFunc("/", peer.versionHandle).Methods(http.MethodGet) @@ -192,11 +153,105 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) { return peer, nil } +func (peer *Peer) getResponse() *response { + if peer.config.RegenInterval <= 0 && peer.config.SafeRate > 0 { + // generate on demand. + if err := peer.updateResponse(); err != nil { + peer.Log.Error("Error updating config.", zap.Error(err)) + } + } + + peer.mu.Lock() + defer peer.mu.Unlock() + return peer.response +} + +func (peer *Peer) updateResponse() (err error) { + response, err := peer.config.generateResponse(peer.initTime) + if err != nil { + peer.Log.Error("Error updating response.", zap.Error(err)) + return err + } + + peer.Log.Debug("Setting version info.", zap.ByteString("Value", response.serialized)) + peer.mu.Lock() + defer peer.mu.Unlock() + peer.response = response + return nil +} + +func (config *Config) generateResponse(initTime time.Time) (rv *response, err error) { + rv = &response{} + + // Convert each Service's VersionConfig String to SemVer + rv.versions.Satellite, err = version.NewOldSemVer(config.Versions.Satellite) + if err != nil { + return nil, err + } + + rv.versions.Storagenode, err = version.NewOldSemVer(config.Versions.Storagenode) + if err != nil { + return nil, err + } + + rv.versions.Uplink, err = version.NewOldSemVer(config.Versions.Uplink) + if err != nil { + return nil, err + } + + rv.versions.Gateway, err = version.NewOldSemVer(config.Versions.Gateway) + if err != nil { + return nil, err + } + + rv.versions.Identity, err = version.NewOldSemVer(config.Versions.Identity) + if err != nil { + return nil, err + } + + rv.versions.Processes.Satellite, err = config.configToProcess(initTime, config.Binary.Satellite) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.versions.Processes.Storagenode, err = config.configToProcess(initTime, config.Binary.Storagenode) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.versions.Processes.StoragenodeUpdater, err = config.configToProcess(initTime, config.Binary.StoragenodeUpdater) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.versions.Processes.Uplink, err = config.configToProcess(initTime, config.Binary.Uplink) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.versions.Processes.Gateway, err = config.configToProcess(initTime, config.Binary.Gateway) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.versions.Processes.Identity, err = config.configToProcess(initTime, config.Binary.Identity) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + rv.serialized, err = json.Marshal(rv.versions) + if err != nil { + return nil, RolloutErr.Wrap(err) + } + + return rv, nil +} + // versionHandle handles all process versions request. func (peer *Peer) versionHandle(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - _, err := w.Write(peer.response) + _, err := w.Write(peer.getResponse().serialized) if err != nil { peer.Log.Error("Error writing response to client.", zap.Error(err)) } @@ -208,20 +263,22 @@ func (peer *Peer) processURLHandle(w http.ResponseWriter, r *http.Request) { service := params["service"] versionType := params["version"] + response := peer.getResponse() + var process version.Process switch service { case "satellite": - process = peer.Versions.Processes.Satellite + process = response.versions.Processes.Satellite case "storagenode": - process = peer.Versions.Processes.Storagenode + process = response.versions.Processes.Storagenode case "storagenode-updater": - process = peer.Versions.Processes.StoragenodeUpdater + process = response.versions.Processes.StoragenodeUpdater case "uplink": - process = peer.Versions.Processes.Uplink + process = response.versions.Processes.Uplink case "gateway": - process = peer.Versions.Processes.Gateway + process = response.versions.Processes.Gateway case "identity": - process = peer.Versions.Processes.Identity + process = response.versions.Processes.Identity default: http.Error(w, "service does not exists", http.StatusNotFound) return @@ -285,6 +342,14 @@ func (peer *Peer) Run(ctx context.Context) (err error) { } return err }) + if peer.config.RegenInterval > 0 { + group.Go(func() error { + defer cancel() + return peer.regenLoop.Run(ctx, func(ctx context.Context) error { + return peer.updateResponse() + }) + }) + } return group.Wait() } @@ -332,14 +397,19 @@ func (rollout RolloutConfig) Validate() error { if rollout.Cursor < 0 || rollout.Cursor > 100 { return RolloutErr.New("invalid cursor percentage: %d", rollout.Cursor) } + if rollout.PreviousCursor < 0 || rollout.PreviousCursor > 100 { + return RolloutErr.New("invalid previous cursor percentage: %d", rollout.PreviousCursor) + } if _, err := hex.DecodeString(rollout.Seed); err != nil { - return RolloutErr.New("invalid seed: %s", rollout.Seed) + return RolloutErr.New("invalid seed: %q", rollout.Seed) } return nil } -func configToProcess(binary ProcessConfig) (version.Process, error) { +func (config *Config) configToProcess(initTime time.Time, binary ProcessConfig) (version.Process, error) { + currentPercent := calculateRolloutCursor(initTime, binary, config.SafeRate) + process := version.Process{ Minimum: version.Version{ Version: binary.Minimum.Version, @@ -350,7 +420,7 @@ func configToProcess(binary ProcessConfig) (version.Process, error) { URL: binary.Suggested.URL, }, Rollout: version.Rollout{ - Cursor: version.PercentageToCursor(binary.Rollout.Cursor), + Cursor: version.PercentageToCursor(int(currentPercent)), }, } @@ -361,3 +431,38 @@ func configToProcess(binary ProcessConfig) (version.Process, error) { copy(process.Rollout.Seed[:], seedBytes) return process, nil } + +func calculateRolloutCursor(initTime time.Time, binary ProcessConfig, safeRate float64) float64 { + targetPercent := float64(binary.Rollout.Cursor) + previousPercent := float64(binary.Rollout.PreviousCursor) + if previousPercent > targetPercent { + previousPercent = targetPercent + } + elapsed := time.Since(initTime) + currentPercent := targetPercent + + safePercentPerDay := safeRate * 100 + if safePercentPerDay > 0 { + // first calculate targetTime: + targetTimeInDaysFromNow := (targetPercent - previousPercent) / safePercentPerDay + targetTime := time.Duration(targetTimeInDaysFromNow * 24 * float64(time.Hour)) + + if targetTime > 0 { + // now calculate the current percent based on how close targetTime is. + currentPercent = clampedLinearInterp(float64(elapsed)/float64(targetTime), previousPercent, targetPercent) + } + } + + return currentPercent +} + +func clampedLinearInterp(frac, low, high float64) float64 { + v := (high-low)*frac + low + if v < low { + return low + } + if v > high { + return high + } + return v +}