versioncontrol: implement partial version rollout automation

see https://github.com/storj/storj/blob/main/docs/blueprints/rollout-automation.md
for context.

there are new config values, but the default values make the
system behave as if there were no changes. existing rollout
configs should work just fine.

Change-Id: I8477adf8b05b5affa5bca1c1a106810c934e5845
This commit is contained in:
JT Olio 2023-05-16 16:25:22 -04:00 committed by Storj Robot
parent 36038af3d1
commit 3566bbf968

View File

@ -13,6 +13,8 @@ import (
"net/http"
"reflect"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/zeebo/errs"
@ -20,6 +22,7 @@ import (
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/sync2"
"storj.io/private/version"
)
@ -35,7 +38,10 @@ var (
// Config is all the configuration parameters for a Version Control Server.
type Config struct {
Address string `user:"true" help:"public address to listen on" default:":8080"`
Address string `user:"true" help:"public address to listen on" default:":8080"`
SafeRate float64 `user:"true" help:"the safe daily fractional increase for a rollout (a value of .5 means 0 to 50% in 24 hours). 0 means immediate rollout." default:".2"`
RegenInterval time.Duration `user:"true" help:"how long to go between recalculating the current cursors. 0 means on demand." default:"5m"`
Versions OldVersionConfig
Binary ProcessesConfig
@ -77,8 +83,16 @@ type VersionConfig struct {
// RolloutConfig represents the state of a version rollout configuration of a process.
type RolloutConfig struct {
Seed string `user:"true" help:"random 32 byte, hex-encoded string"`
Cursor int `user:"true" help:"percentage of nodes which should roll-out to the suggested version" default:"0"`
Seed string `user:"true" help:"random 32 byte, hex-encoded string"`
PreviousCursor int `user:"true" help:"prior configuration's cursor value. if 100%, will be capped at the current cursor." default:"100"`
Cursor int `user:"true" help:"percentage of nodes which should roll-out to the suggested version" default:"0"`
}
// response invariant: the struct or its data is never modified after creation.
type response struct {
versions version.AllowedVersions
// serialized contains the byte version of current allowed versions.
serialized []byte
}
// Peer is the representation of a VersionControl Server.
@ -94,10 +108,13 @@ type Peer struct {
Listener net.Listener
}
Versions version.AllowedVersions
config Config
initTime time.Time
// response contains the byte version of current allowed versions
response []byte
regenLoop *sync2.Cycle
mu sync.Mutex
response *response
}
// New creates a new VersionControl Server.
@ -107,73 +124,17 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
}
peer = &Peer{
Log: log,
Log: log,
config: *config,
initTime: time.Now(),
regenLoop: sync2.NewCycle(config.RegenInterval),
}
// Convert each Service's VersionConfig String to SemVer
peer.Versions.Satellite, err = version.NewOldSemVer(config.Versions.Satellite)
err = peer.updateResponse()
if err != nil {
return &Peer{}, err
return nil, err
}
peer.Versions.Storagenode, err = version.NewOldSemVer(config.Versions.Storagenode)
if err != nil {
return &Peer{}, err
}
peer.Versions.Uplink, err = version.NewOldSemVer(config.Versions.Uplink)
if err != nil {
return &Peer{}, err
}
peer.Versions.Gateway, err = version.NewOldSemVer(config.Versions.Gateway)
if err != nil {
return &Peer{}, err
}
peer.Versions.Identity, err = version.NewOldSemVer(config.Versions.Identity)
if err != nil {
return &Peer{}, err
}
peer.Versions.Processes.Satellite, err = configToProcess(config.Binary.Satellite)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.Storagenode, err = configToProcess(config.Binary.Storagenode)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.StoragenodeUpdater, err = configToProcess(config.Binary.StoragenodeUpdater)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.Uplink, err = configToProcess(config.Binary.Uplink)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.Gateway, err = configToProcess(config.Binary.Gateway)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.Versions.Processes.Identity, err = configToProcess(config.Binary.Identity)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
peer.response, err = json.Marshal(peer.Versions)
if err != nil {
peer.Log.Error("Error marshalling version info.", zap.Error(err))
return nil, RolloutErr.Wrap(err)
}
peer.Log.Debug("Setting version info.", zap.ByteString("Value", peer.response))
{
router := mux.NewRouter()
router.HandleFunc("/", peer.versionHandle).Methods(http.MethodGet)
@ -192,11 +153,105 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
return peer, nil
}
func (peer *Peer) getResponse() *response {
if peer.config.RegenInterval <= 0 && peer.config.SafeRate > 0 {
// generate on demand.
if err := peer.updateResponse(); err != nil {
peer.Log.Error("Error updating config.", zap.Error(err))
}
}
peer.mu.Lock()
defer peer.mu.Unlock()
return peer.response
}
func (peer *Peer) updateResponse() (err error) {
response, err := peer.config.generateResponse(peer.initTime)
if err != nil {
peer.Log.Error("Error updating response.", zap.Error(err))
return err
}
peer.Log.Debug("Setting version info.", zap.ByteString("Value", response.serialized))
peer.mu.Lock()
defer peer.mu.Unlock()
peer.response = response
return nil
}
func (config *Config) generateResponse(initTime time.Time) (rv *response, err error) {
rv = &response{}
// Convert each Service's VersionConfig String to SemVer
rv.versions.Satellite, err = version.NewOldSemVer(config.Versions.Satellite)
if err != nil {
return nil, err
}
rv.versions.Storagenode, err = version.NewOldSemVer(config.Versions.Storagenode)
if err != nil {
return nil, err
}
rv.versions.Uplink, err = version.NewOldSemVer(config.Versions.Uplink)
if err != nil {
return nil, err
}
rv.versions.Gateway, err = version.NewOldSemVer(config.Versions.Gateway)
if err != nil {
return nil, err
}
rv.versions.Identity, err = version.NewOldSemVer(config.Versions.Identity)
if err != nil {
return nil, err
}
rv.versions.Processes.Satellite, err = config.configToProcess(initTime, config.Binary.Satellite)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.versions.Processes.Storagenode, err = config.configToProcess(initTime, config.Binary.Storagenode)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.versions.Processes.StoragenodeUpdater, err = config.configToProcess(initTime, config.Binary.StoragenodeUpdater)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.versions.Processes.Uplink, err = config.configToProcess(initTime, config.Binary.Uplink)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.versions.Processes.Gateway, err = config.configToProcess(initTime, config.Binary.Gateway)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.versions.Processes.Identity, err = config.configToProcess(initTime, config.Binary.Identity)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
rv.serialized, err = json.Marshal(rv.versions)
if err != nil {
return nil, RolloutErr.Wrap(err)
}
return rv, nil
}
// versionHandle handles all process versions request.
func (peer *Peer) versionHandle(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, err := w.Write(peer.response)
_, err := w.Write(peer.getResponse().serialized)
if err != nil {
peer.Log.Error("Error writing response to client.", zap.Error(err))
}
@ -208,20 +263,22 @@ func (peer *Peer) processURLHandle(w http.ResponseWriter, r *http.Request) {
service := params["service"]
versionType := params["version"]
response := peer.getResponse()
var process version.Process
switch service {
case "satellite":
process = peer.Versions.Processes.Satellite
process = response.versions.Processes.Satellite
case "storagenode":
process = peer.Versions.Processes.Storagenode
process = response.versions.Processes.Storagenode
case "storagenode-updater":
process = peer.Versions.Processes.StoragenodeUpdater
process = response.versions.Processes.StoragenodeUpdater
case "uplink":
process = peer.Versions.Processes.Uplink
process = response.versions.Processes.Uplink
case "gateway":
process = peer.Versions.Processes.Gateway
process = response.versions.Processes.Gateway
case "identity":
process = peer.Versions.Processes.Identity
process = response.versions.Processes.Identity
default:
http.Error(w, "service does not exists", http.StatusNotFound)
return
@ -285,6 +342,14 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
}
return err
})
if peer.config.RegenInterval > 0 {
group.Go(func() error {
defer cancel()
return peer.regenLoop.Run(ctx, func(ctx context.Context) error {
return peer.updateResponse()
})
})
}
return group.Wait()
}
@ -332,14 +397,19 @@ func (rollout RolloutConfig) Validate() error {
if rollout.Cursor < 0 || rollout.Cursor > 100 {
return RolloutErr.New("invalid cursor percentage: %d", rollout.Cursor)
}
if rollout.PreviousCursor < 0 || rollout.PreviousCursor > 100 {
return RolloutErr.New("invalid previous cursor percentage: %d", rollout.PreviousCursor)
}
if _, err := hex.DecodeString(rollout.Seed); err != nil {
return RolloutErr.New("invalid seed: %s", rollout.Seed)
return RolloutErr.New("invalid seed: %q", rollout.Seed)
}
return nil
}
func configToProcess(binary ProcessConfig) (version.Process, error) {
func (config *Config) configToProcess(initTime time.Time, binary ProcessConfig) (version.Process, error) {
currentPercent := calculateRolloutCursor(initTime, binary, config.SafeRate)
process := version.Process{
Minimum: version.Version{
Version: binary.Minimum.Version,
@ -350,7 +420,7 @@ func configToProcess(binary ProcessConfig) (version.Process, error) {
URL: binary.Suggested.URL,
},
Rollout: version.Rollout{
Cursor: version.PercentageToCursor(binary.Rollout.Cursor),
Cursor: version.PercentageToCursor(int(currentPercent)),
},
}
@ -361,3 +431,38 @@ func configToProcess(binary ProcessConfig) (version.Process, error) {
copy(process.Rollout.Seed[:], seedBytes)
return process, nil
}
func calculateRolloutCursor(initTime time.Time, binary ProcessConfig, safeRate float64) float64 {
targetPercent := float64(binary.Rollout.Cursor)
previousPercent := float64(binary.Rollout.PreviousCursor)
if previousPercent > targetPercent {
previousPercent = targetPercent
}
elapsed := time.Since(initTime)
currentPercent := targetPercent
safePercentPerDay := safeRate * 100
if safePercentPerDay > 0 {
// first calculate targetTime:
targetTimeInDaysFromNow := (targetPercent - previousPercent) / safePercentPerDay
targetTime := time.Duration(targetTimeInDaysFromNow * 24 * float64(time.Hour))
if targetTime > 0 {
// now calculate the current percent based on how close targetTime is.
currentPercent = clampedLinearInterp(float64(elapsed)/float64(targetTime), previousPercent, targetPercent)
}
}
return currentPercent
}
func clampedLinearInterp(frac, low, high float64) float64 {
v := (high-low)*frac + low
if v < low {
return low
}
if v > high {
return high
}
return v
}