{internal/version,versioncontrol,cmd/storagenode-updater}: add rollout to storagenode updater (#3276)

This commit is contained in:
Bryan White 2019-10-21 12:50:59 +02:00 committed by GitHub
parent 91872fcbd3
commit f468816f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 507 additions and 308 deletions

View File

@ -9,7 +9,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
@ -23,14 +22,23 @@ import (
"syscall"
"time"
"github.com/blang/semver"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/fpath"
"storj.io/storj/internal/sync2"
"storj.io/storj/internal/version"
"storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/storj"
)
const minCheckInterval = time.Minute
var (
cancel context.CancelFunc
@ -42,46 +50,53 @@ var (
Use: "run",
Short: "Run the storagenode-updater for storage node",
Args: cobra.OnlyValidArgs,
RunE: func(cmd *cobra.Command, args []string) (err error) {
err = cmdRun(cmd, args)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
return nil
},
RunE: cmdRun,
}
interval string
versionURL string
binaryLocation string
serviceName string
logPath string
runCfg struct {
// TODO: check interval default has changed from 6 hours to 15 min.
checker.Config
Identity identity.Config
BinaryLocation string `help:"the storage node executable binary location" default:"storagenode.exe"`
ServiceName string `help:"storage node OS service name" default:"storagenode"`
LogPath string `help:"path to log file, if empty standard output will be used" default:""`
}
confDir string
identityDir string
)
func init() {
// TODO: this will probably generate warnings for mismatched config fields.
defaultConfDir := fpath.ApplicationDir("storj", "storagenode")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "storagenode")
cfgstruct.SetupFlag(zap.L(), rootCmd, &confDir, "config-dir", defaultConfDir, "main directory for storagenode configuration")
cfgstruct.SetupFlag(zap.L(), rootCmd, &identityDir, "identity-dir", defaultIdentityDir, "main directory for storagenode identity credentials")
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.AddCommand(runCmd)
runCmd.Flags().StringVar(&interval, "interval", "06h", "interval for checking the new version, 0 or less value will execute version check only once")
runCmd.Flags().StringVar(&versionURL, "version-url", "https://version.storj.io/release/", "version server URL")
runCmd.Flags().StringVar(&binaryLocation, "binary-location", "storagenode.exe", "the storage node executable binary location")
runCmd.Flags().StringVar(&serviceName, "service-name", "storagenode", "storage node OS service name")
runCmd.Flags().StringVar(&logPath, "log", "", "path to log file, if empty standard output will be used")
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}
func cmdRun(cmd *cobra.Command, args []string) (err error) {
if logPath != "" {
logFile, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if runCfg.LogPath != "" {
logFile, err := os.OpenFile(runCfg.LogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return errs.New("error opening log file: %v", err)
log.Fatalf("error opening log file: %s", err)
}
defer func() { err = errs.Combine(err, logFile.Close()) }()
log.SetOutput(logFile)
}
if !fileExists(binaryLocation) {
return errs.New("unable to find storage node executable binary")
if !fileExists(runCfg.BinaryLocation) {
log.Fatal("unable to find storage node executable binary")
}
ident, err := runCfg.Identity.Load()
if err != nil {
log.Fatalf("error loading identity: %s", err)
}
var ctx context.Context
@ -96,119 +111,116 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
cancel()
}()
loopInterval, err := time.ParseDuration(interval)
if err != nil {
return errs.New("unable to parse interval parameter: %v", err)
}
loopFunc := func(ctx context.Context) (err error) {
if err := update(ctx); err != nil {
if err := update(ctx, ident.ID); err != nil {
// don't finish loop in case of error just wait for another execution
log.Println(err)
}
return nil
}
if loopInterval <= 0 {
switch {
case runCfg.CheckInterval <= 0:
err = loopFunc(ctx)
} else {
loop := sync2.NewCycle(loopInterval)
case runCfg.CheckInterval < minCheckInterval:
log.Printf("check interval below minimum: \"%s\", setting to %s", runCfg.CheckInterval, minCheckInterval)
runCfg.CheckInterval = minCheckInterval
fallthrough
default:
loop := sync2.NewCycle(runCfg.CheckInterval)
err = loop.Run(ctx, loopFunc)
}
if err != context.Canceled {
return err
if err != nil && errs2.IsCanceled(err) {
log.Fatal(err)
}
return nil
}
// TODO: refactor
func update(ctx context.Context) (err error) {
// TODO: use config struct binding
clientConfig := checker.ClientConfig{
ServerAddress: versionURL,
RequestTimeout: time.Minute,
}
client := checker.New(clientConfig)
func update(ctx context.Context, nodeID storj.NodeID) (err error) {
client := checker.New(runCfg.ClientConfig)
currentVersion, err := binaryVersion(binaryLocation)
currentVersion, err := binaryVersion(runCfg.BinaryLocation)
if err != nil {
return err
return errs.Wrap(err)
}
log.Println("downloading versions from", versionURL)
process, err := client.Process(ctx, serviceName)
log.Println("downloading versions from", runCfg.ServerAddress)
shouldUpdate, newVersion, err := client.ShouldUpdate(ctx, runCfg.ServiceName, nodeID)
if err != nil {
return err
return errs.Wrap(err)
}
downloadURL := process.Suggested.URL
downloadURL = strings.Replace(downloadURL, "{os}", runtime.GOOS, 1)
downloadURL = strings.Replace(downloadURL, "{arch}", runtime.GOARCH, 1)
// TODO: check rollout
suggestedVersion, err := semver.Parse(process.Suggested.Version)
if err != nil {
return checker.Error.Wrap(err)
}
if currentVersion.Compare(suggestedVersion) < 0 {
tempArchive, err := ioutil.TempFile(os.TempDir(), serviceName)
if shouldUpdate {
downloadURL := newVersion.URL
downloadURL = strings.Replace(downloadURL, "{os}", runtime.GOOS, 1)
downloadURL = strings.Replace(downloadURL, "{arch}", runtime.GOARCH, 1)
// TODO: consolidate semver.Version and version.SemVer
suggestedVersion, err := newVersion.SemVer()
if err != nil {
return errs.New("cannot create temporary archive: %v", err)
}
defer func() { err = errs.Combine(err, os.Remove(tempArchive.Name())) }()
log.Println("start downloading", downloadURL, "to", tempArchive.Name())
err = downloadArchive(ctx, tempArchive, downloadURL)
if err != nil {
return err
}
log.Println("finished downloading", downloadURL, "to", tempArchive.Name())
extension := filepath.Ext(binaryLocation)
if extension != "" {
extension = "." + extension
return errs.Wrap(err)
}
dir := filepath.Dir(binaryLocation)
backupExec := filepath.Join(dir, serviceName+".old."+currentVersion.String()+extension)
if currentVersion.Compare(suggestedVersion) < 0 {
tempArchive, err := ioutil.TempFile(os.TempDir(), runCfg.ServiceName)
if err != nil {
return errs.New("cannot create temporary archive: %v", err)
}
defer func() { err = errs.Combine(err, os.Remove(tempArchive.Name())) }()
if err = os.Rename(binaryLocation, backupExec); err != nil {
return err
log.Println("start downloading", downloadURL, "to", tempArchive.Name())
err = downloadArchive(ctx, tempArchive, downloadURL)
if err != nil {
return errs.Wrap(err)
}
log.Println("finished downloading", downloadURL, "to", tempArchive.Name())
extension := filepath.Ext(runCfg.BinaryLocation)
if extension != "" {
extension = "." + extension
}
dir := filepath.Dir(runCfg.BinaryLocation)
backupExec := filepath.Join(dir, runCfg.ServiceName+".old."+currentVersion.String()+extension)
if err = os.Rename(runCfg.BinaryLocation, backupExec); err != nil {
return errs.Wrap(err)
}
err = unpackBinary(ctx, tempArchive.Name(), runCfg.BinaryLocation)
if err != nil {
return errs.Wrap(err)
}
downloadedVersion, err := binaryVersion(runCfg.BinaryLocation)
if err != nil {
return errs.Wrap(err)
}
if suggestedVersion.Compare(downloadedVersion) != 0 {
return errs.New("invalid version downloaded: wants %s got %s", suggestedVersion.String(), downloadedVersion.String())
}
log.Println("restarting service", runCfg.ServiceName)
err = restartSNService(runCfg.ServiceName)
if err != nil {
// TODO: should we try to recover from this?
return errs.New("unable to restart service: %v", err)
}
log.Println("service", runCfg.ServiceName, "restarted successfully")
// TODO remove old binary ??
} else {
log.Printf("%s version is up to date\n", runCfg.ServiceName)
}
err = unpackBinary(ctx, tempArchive.Name(), binaryLocation)
if err != nil {
return err
}
downloadedVersion, err := binaryVersion(binaryLocation)
if err != nil {
return err
}
if suggestedVersion.Compare(downloadedVersion) != 0 {
return errs.New("invalid version downloaded: wants %s got %s", suggestedVersion.String(), downloadedVersion.String())
}
log.Println("restarting service", serviceName)
err = restartSNService(serviceName)
if err != nil {
return errs.New("unable to restart service: %v", err)
}
log.Println("service", serviceName, "restarted successfully")
// TODO remove old binary ??
} else {
log.Printf("%s version is up to date\n", serviceName)
}
return nil
}
func binaryVersion(location string) (semver.Version, error) {
func binaryVersion(location string) (version.SemVer, error) {
out, err := exec.Command(location, "version").Output()
if err != nil {
return semver.Version{}, err
return version.SemVer{}, err
}
scanner := bufio.NewScanner(bytes.NewReader(out))
@ -217,13 +229,10 @@ func binaryVersion(location string) (semver.Version, error) {
prefix := "Version: "
if strings.HasPrefix(line, prefix) {
line = line[len(prefix):]
if strings.HasPrefix(line, "v") {
line = line[1:]
}
return semver.Make(line)
return version.NewSemVer(line)
}
}
return semver.Version{}, errs.New("unable to determine binary version")
return version.SemVer{}, errs.New("unable to determine binary version")
}
func downloadArchive(ctx context.Context, file io.Writer, url string) (err error) {
@ -278,11 +287,12 @@ func restartSNService(name string) error {
switch runtime.GOOS {
case "windows":
// TODO how run this as one command `net stop servicename && net start servicename`?
_, err := exec.Command("net", "stop", name).Output()
// TODO: combine stdout with err if err
_, err := exec.Command("net", "stop", name).CombinedOutput()
if err != nil {
return err
}
_, err = exec.Command("net", "start", name).Output()
_, err = exec.Command("net", "start", name).CombinedOutput()
if err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"testing"
@ -22,15 +23,38 @@ func TestAutoUpdater(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
content, err := ioutil.ReadFile("testdata/fake-storagenode")
require.NoError(t, err)
testFiles := []struct {
src string
dst string
perms os.FileMode
}{
{
"testdata/fake-storagenode",
ctx.File("storagenode"),
0755,
},
{
"testdata/fake-ident.cert",
ctx.File("identity.cert"),
0644,
},
{
"testdata/fake-ident.key",
ctx.File("identity.key"),
0600,
},
}
tmpExec := ctx.File("storagenode")
err = ioutil.WriteFile(tmpExec, content, 0755)
require.NoError(t, err)
for _, file := range testFiles {
content, err := ioutil.ReadFile(file.src)
require.NoError(t, err)
err = ioutil.WriteFile(file.dst, content, file.perms)
require.NoError(t, err)
}
var mux http.ServeMux
content, err = ioutil.ReadFile("testdata/fake-storagenode.zip")
content, err := ioutil.ReadFile("testdata/fake-storagenode.zip")
require.NoError(t, err)
mux.HandleFunc("/download", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(content)
@ -42,19 +66,23 @@ func TestAutoUpdater(t *testing.T) {
config := &versioncontrol.Config{
Address: "127.0.0.1:0",
Versions: versioncontrol.ServiceVersions{
Versions: versioncontrol.OldVersionConfig{
Satellite: "v0.0.1",
Storagenode: "v0.0.1",
Uplink: "v0.0.1",
Gateway: "v0.0.1",
Identity: "v0.0.1",
},
Binary: versioncontrol.Versions{
Storagenode: versioncontrol.Binary{
Suggested: versioncontrol.Version{
Binary: versioncontrol.ProcessesConfig{
Storagenode: versioncontrol.ProcessConfig{
Suggested: versioncontrol.VersionConfig{
Version: "0.19.5",
URL: ts.URL + "/download",
},
Rollout: versioncontrol.RolloutConfig{
Seed: "0000000000000000000000000000000000000000000000000000000000000001",
Cursor: 100,
},
},
},
}
@ -69,17 +97,24 @@ func TestAutoUpdater(t *testing.T) {
args = append(args, "run")
args = append(args, "main.go")
args = append(args, "run")
args = append(args, "--version-url")
args = append(args, "--server-address")
args = append(args, "http://"+peer.Addr())
args = append(args, "--binary-location")
args = append(args, tmpExec)
args = append(args, "--interval")
args = append(args, "0")
args = append(args, testFiles[0].dst)
args = append(args, "--check-interval")
args = append(args, "0s")
args = append(args, "--identity.cert-path")
args = append(args, testFiles[1].dst)
args = append(args, "--identity.key-path")
args = append(args, testFiles[2].dst)
out, err := exec.Command("go", args...).CombinedOutput()
assert.NoError(t, err)
result := string(out)
if !assert.NoError(t, err) {
t.Log(result)
t.Fatal(err)
}
if !assert.Contains(t, result, "restarted successfully") {
t.Log(result)
}

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIBYTCCAQegAwIBAgIQQPuyXOBKrcCNi4ZP689uqjAKBggqhkjOPQQDAjAQMQ4w
DAYDVQQKEwVTdG9yajAiGA8wMDAxMDEwMTAwMDAwMFoYDzAwMDEwMTAxMDAwMDAw
WjAQMQ4wDAYDVQQKEwVTdG9yajBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABApd
LWoax4ObclLAVF7z5c5xEMepbORSLvJHmrXTV+GOzvY9WlWDoCLkjb/0jyraO1rv
PurOF+CyYw+5VKrDy3WjPzA9MA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggr
BgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAKBggqhkjOPQQDAgNIADBF
AiBQXI6Bt3BeSkEo+Petb968xzG3uYV1nQPd9+NUrVt8+AIhALU+87srxcBmZaEU
sYhYKBddEb8iXY/nAs4yucdQPp6E
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIBZDCCAQugAwIBAgIQUe65N4TfVKIodX3NR7xAoDAKBggqhkjOPQQDAjAQMQ4w
DAYDVQQKEwVTdG9yajAiGA8wMDAxMDEwMTAwMDAwMFoYDzAwMDEwMTAxMDAwMDAw
WjAQMQ4wDAYDVQQKEwVTdG9yajBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABO8n
U+YOQgmXXo83mNrTwWeYHU5opITSBWT90h6zKtBzFqt90DSN/bs7u2j3BPAziWCE
/lmTmJoS2QyhCo1h7b6jQzBBMA4GA1UdDwEB/wQEAwICBDATBgNVHSUEDDAKBggr
BgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MAkGBIg3AgEEAQAwCgYIKoZIzj0EAwID
RwAwRAIgPwJKxtVHBGLuw46RP0oP6MBxWqrL/x6pL3TcaqoMdNMCIBufmrivEkxh
Ovahol4B3XuNHxkJcH7nNhuhvCMbxHdg
-----END CERTIFICATE-----

View File

@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgGjmUhbUm6SCnOE5H
HNZqOblN5PeLptgCauP1KUNChm+hRANCAAQKXS1qGseDm3JSwFRe8+XOcRDHqWzk
Ui7yR5q101fhjs72PVpVg6Ai5I2/9I8q2jta7z7qzhfgsmMPuVSqw8t1
-----END PRIVATE KEY-----

View File

@ -18,7 +18,7 @@ import (
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/version"
vc_checker "storj.io/storj/internal/version/checker"
versionchecker "storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/peertls/extensions"
"storj.io/storj/pkg/peertls/tlsopts"
@ -66,7 +66,7 @@ type SatelliteSystem struct {
Server *server.Server
Version *vc_checker.Service
Version *versionchecker.Service
Contact struct {
Service *contact.Service

View File

@ -27,7 +27,7 @@ func (planet *Planet) newVersionControlServer() (peer *versioncontrol.Peer, err
config := &versioncontrol.Config{
Address: "127.0.0.1:0",
Versions: versioncontrol.ServiceVersions{
Versions: versioncontrol.OldVersionConfig{
Satellite: "v0.0.1",
Storagenode: "v0.0.1",
Uplink: "v0.0.1",
@ -47,14 +47,16 @@ func (planet *Planet) newVersionControlServer() (peer *versioncontrol.Peer, err
// NewVersionInfo returns the Version Info for this planet with tuned metrics.
func (planet *Planet) NewVersionInfo() version.Info {
ver, err := version.NewSemVer("v0.0.1")
if err != nil {
panic(err)
}
info := version.Info{
Timestamp: time.Now(),
CommitHash: "testplanet",
Version: version.SemVer{
Major: 0,
Minor: 0,
Patch: 1},
Release: false,
Version: ver,
Release: false,
}
return info
}

View File

@ -17,6 +17,7 @@ import (
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/storj"
)
var (
@ -87,7 +88,7 @@ func (client *Client) OldMinimum(ctx context.Context, serviceName string) (ver v
versions, err := client.All(ctx)
if err != nil {
return version.SemVer{}, err
return version.SemVer{}, Error.Wrap(err)
}
r := reflect.ValueOf(&versions)
@ -105,7 +106,7 @@ func (client *Client) Process(ctx context.Context, processName string) (process
versions, err := client.All(ctx)
if err != nil {
return version.Process{}, err
return version.Process{}, Error.Wrap(err)
}
processesValue := reflect.ValueOf(versions.Processes)
@ -122,3 +123,20 @@ func (client *Client) Process(ctx context.Context, processName string) (process
}
return process, nil
}
// ShouldUpdate downloads the rollout state from the versioncontrol server and
// checks if a user with the given nodeID should update, and if so, to what version.
func (client *Client) ShouldUpdate(ctx context.Context, processName string, nodeID storj.NodeID) (_ bool, _ version.Version, err error) {
defer mon.Task()(&ctx, processName)(&err)
process, err := client.Process(ctx, processName)
if err != nil {
return false, version.Version{}, Error.Wrap(err)
}
shouldUpdate := version.ShouldUpdate(process.Rollout, nodeID)
if shouldUpdate {
return true, process.Suggested, nil
}
return false, version.Version{}, nil
}

View File

@ -4,6 +4,7 @@
package checker_test
import (
"encoding/hex"
"fmt"
"reflect"
"testing"
@ -14,9 +15,12 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/version"
"storj.io/storj/internal/version/checker"
"storj.io/storj/pkg/storj"
"storj.io/storj/versioncontrol"
)
var testHexSeed = "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
func TestClient_All(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -73,6 +77,42 @@ func TestClient_Process(t *testing.T) {
require.Equal(t, expectedVersionStr, process.Minimum.Version)
require.Equal(t, expectedVersionStr, process.Suggested.Version)
actualHexSeed := hex.EncodeToString(process.Rollout.Seed[:])
require.NoError(t, err)
require.Equal(t, testHexSeed, actualHexSeed)
// TODO: find a better way to test this
require.NotEmpty(t, process.Rollout.Cursor)
}
}
func TestClient_ShouldUpdate(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
peer := newTestPeer(t, ctx)
defer ctx.Check(peer.Close)
clientConfig := checker.ClientConfig{
ServerAddress: "http://" + peer.Addr(),
RequestTimeout: 0,
}
client := checker.New(clientConfig)
processesType := reflect.TypeOf(version.Processes{})
fieldCount := processesType.NumField()
for i := 1; i < fieldCount; i++ {
field := processesType.Field(i - 1)
expectedVersionStr := fmt.Sprintf("v%d.%d.%d", i, i+1, i+2)
// NB: test cursor is 100%; rollout/nodeID should-update calculation is tested elsewhere.
shouldUpdate, ver, err := client.ShouldUpdate(ctx, field.Name, storj.NodeID{})
require.NoError(t, err)
require.True(t, shouldUpdate)
require.Equal(t, expectedVersionStr, ver.Version)
}
}
@ -82,7 +122,7 @@ func newTestPeer(t *testing.T, ctx *testcontext.Context) *versioncontrol.Peer {
testVersions := newTestVersions(t)
serverConfig := &versioncontrol.Config{
Address: "127.0.0.1:0",
Versions: versioncontrol.ServiceVersions{
Versions: versioncontrol.OldVersionConfig{
Satellite: "v0.0.1",
Storagenode: "v0.0.1",
Uplink: "v0.0.1",
@ -101,7 +141,7 @@ func newTestPeer(t *testing.T, ctx *testcontext.Context) *versioncontrol.Peer {
return peer
}
func newTestVersions(t *testing.T) (versions versioncontrol.Versions) {
func newTestVersions(t *testing.T) (versions versioncontrol.ProcessesConfig) {
t.Helper()
versionsValue := reflect.ValueOf(&versions)
@ -112,13 +152,17 @@ func newTestVersions(t *testing.T) (versions versioncontrol.Versions) {
field := versionsElem.Field(i)
versionString := fmt.Sprintf("v%d.%d.%d", i+1, i+2, i+3)
binary := versioncontrol.Binary{
Minimum: versioncontrol.Version{
binary := versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: versionString,
},
Suggested: versioncontrol.Version{
Suggested: versioncontrol.VersionConfig{
Version: versionString,
},
Rollout: versioncontrol.RolloutConfig{
Seed: testHexSeed,
Cursor: 100,
},
}
field.Set(reflect.ValueOf(binary))

View File

@ -9,15 +9,15 @@ import (
)
// Stats implements the monkit.StatSource interface
func (v *Info) Stats(reportValue func(name string, val float64)) {
if v.Release {
func (info *Info) Stats(reportValue func(name string, val float64)) {
if info.Release {
reportValue("release", 1)
} else {
reportValue("release", 0)
}
reportValue("timestamp", float64(v.Timestamp.Unix()))
reportValue("timestamp", float64(info.Timestamp.Unix()))
crc := atomic.LoadUint32(&v.commitHashCRC)
crc := atomic.LoadUint32(&info.commitHashCRC)
if crc == 0 {
c := crc32.NewIEEE()
@ -25,11 +25,11 @@ func (v *Info) Stats(reportValue func(name string, val float64)) {
if err != nil {
panic(err)
}
atomic.StoreUint32(&v.commitHashCRC, c.Sum32())
atomic.StoreUint32(&info.commitHashCRC, c.Sum32())
}
reportValue("commit", float64(crc))
reportValue("major", float64(v.Version.Major))
reportValue("minor", float64(v.Version.Minor))
reportValue("patch", float64(v.Version.Patch))
reportValue("major", float64(info.Version.Major))
reportValue("minor", float64(info.Version.Minor))
reportValue("patch", float64(info.Version.Patch))
}

View File

@ -4,25 +4,26 @@
package version
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"math/big"
"reflect"
"strconv"
"strings"
"time"
"github.com/blang/semver"
"github.com/zeebo/errs"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
)
// semVerRegex is the regular expression used to parse a semantic version.
// https://github.com/Masterminds/semver/blob/master/LICENSE.txt
const (
semVerRegex string = `v?([0-9]+)\.([0-9]+)\.([0-9]+)`
quote = byte('"')
)
const quote = byte('"')
var (
// VerError is the error class for version-related errors.
@ -37,8 +38,6 @@ var (
// Build is a struct containing all relevant build information associated with the binary
Build Info
versionRegex = regexp.MustCompile("^" + semVerRegex + "$")
)
// Info is the versioning information for a binary
@ -53,10 +52,9 @@ type Info struct {
}
// SemVer represents a semantic version
// TODO: replace with semver.Version
type SemVer struct {
Major int64 `json:"major"`
Minor int64 `json:"minor"`
Patch int64 `json:"patch"`
semver.Version
}
// AllowedVersions provides the Minimum SemVer per Service
@ -105,6 +103,11 @@ type RolloutBytes [32]byte
// MarshalJSON hex-encodes RolloutBytes and pre/appends JSON string literal quotes.
func (rb RolloutBytes) MarshalJSON() ([]byte, error) {
zeroRolloutBytes := RolloutBytes{}
if bytes.Equal(rb[:], zeroRolloutBytes[:]) {
return []byte{quote, quote}, nil
}
hexBytes := make([]byte, hex.EncodedLen(len(rb)))
hex.Encode(hexBytes, rb[:])
encoded := append([]byte{quote}, hexBytes...)
@ -122,52 +125,20 @@ func (rb *RolloutBytes) UnmarshalJSON(b []byte) error {
// NewSemVer parses a given version and returns an instance of SemVer or
// an error if unable to parse the version.
func NewSemVer(v string) (sv SemVer, err error) {
m := versionRegex.FindStringSubmatch(v)
if m == nil {
return SemVer{}, VerError.New("invalid semantic version for build %s", v)
}
// first entry of m is the entire version string
sv.Major, err = strconv.ParseInt(m[1], 10, 64)
func NewSemVer(v string) (SemVer, error) {
ver, err := semver.ParseTolerant(v)
if err != nil {
return SemVer{}, VerError.Wrap(err)
}
sv.Minor, err = strconv.ParseInt(m[2], 10, 64)
if err != nil {
return SemVer{}, VerError.Wrap(err)
}
sv.Patch, err = strconv.ParseInt(m[3], 10, 64)
if err != nil {
return SemVer{}, VerError.Wrap(err)
}
return sv, nil
return SemVer{
Version: ver,
}, nil
}
// Compare compare two versions, return -1 if compared version is greater, 0 if equal and 1 if less.
func (sem *SemVer) Compare(version SemVer) int {
result := sem.Major - version.Major
if result > 0 {
return 1
} else if result < 0 {
return -1
}
result = sem.Minor - version.Minor
if result > 0 {
return 1
} else if result < 0 {
return -1
}
result = sem.Patch - version.Patch
if result > 0 {
return 1
} else if result < 0 {
return -1
}
return 0
return sem.Version.Compare(version.Version)
}
// String converts the SemVer struct to a more easy to handle string
@ -175,15 +146,30 @@ func (sem *SemVer) String() (version string) {
return fmt.Sprintf("v%d.%d.%d", sem.Major, sem.Minor, sem.Patch)
}
// IsZero checks if the semantic version is its zero value.
func (sem SemVer) IsZero() bool {
return reflect.ValueOf(sem).IsZero()
}
// SemVer converts a version struct into a semantic version struct.
func (ver *Version) SemVer() (SemVer, error) {
return NewSemVer(ver.Version)
}
// New creates Version_Info from a json byte array
func New(data []byte) (v Info, err error) {
err = json.Unmarshal(data, &v)
return v, VerError.Wrap(err)
}
// IsZero checks if the version struct is its zero value.
func (info Info) IsZero() bool {
return reflect.ValueOf(info).IsZero()
}
// Marshal converts the existing Version Info to any json byte array
func (v Info) Marshal() ([]byte, error) {
data, err := json.Marshal(v)
func (info Info) Marshal() ([]byte, error) {
data, err := json.Marshal(info)
if err != nil {
return nil, VerError.Wrap(err)
}
@ -193,15 +179,42 @@ func (v Info) Marshal() ([]byte, error) {
// Proto converts an Info struct to a pb.NodeVersion
// TODO: shouldn't we just use pb.NodeVersion everywhere? gogoproto will let
// us make it match Info.
func (v Info) Proto() (*pb.NodeVersion, error) {
func (info Info) Proto() (*pb.NodeVersion, error) {
return &pb.NodeVersion{
Version: v.Version.String(),
CommitHash: v.CommitHash,
Timestamp: v.Timestamp,
Release: v.Release,
Version: info.Version.String(),
CommitHash: info.CommitHash,
Timestamp: info.Timestamp,
Release: info.Release,
}, nil
}
// PercentageToCursor calculates the cursor value for the given percentage of nodes which should update.
func PercentageToCursor(pct int) RolloutBytes {
// NB: convert the max value to a number, multiply by the percentage, convert back.
var maxInt, maskInt big.Int
var maxBytes RolloutBytes
for i := 0; i < len(maxBytes); i++ {
maxBytes[i] = 255
}
maxInt.SetBytes(maxBytes[:])
maskInt.Div(maskInt.Mul(&maxInt, big.NewInt(int64(pct))), big.NewInt(100))
var cursor RolloutBytes
copy(cursor[:], maskInt.Bytes())
return cursor
}
// ShouldUpdate checks if for the the given rollout state, a user with the given nodeID should update.
func ShouldUpdate(rollout Rollout, nodeID storj.NodeID) bool {
hash := hmac.New(sha256.New, rollout.Seed[:])
_, err := hash.Write(nodeID[:])
if err != nil {
panic(err)
}
return bytes.Compare(hash.Sum(nil), rollout.Cursor[:]) <= 0
}
func init() {
if buildVersion == "" && buildTimestamp == "" && buildCommitHash == "" && buildRelease == "" {
return
@ -226,5 +239,4 @@ func init() {
if Build.Timestamp.Unix() == 0 || Build.CommitHash == "" {
Build.Release = false
}
}

View File

@ -5,13 +5,39 @@ package version_test
import (
"encoding/json"
"math"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/version"
"storj.io/storj/pkg/storj"
)
func TestInfo_IsZero(t *testing.T) {
zeroInfo := version.Info{}
require.True(t, zeroInfo.IsZero())
ver, err := version.NewSemVer("1.2.3")
require.NoError(t, err)
info := version.Info{
Version: ver,
}
require.False(t, info.IsZero())
}
func TestSemVer_IsZero(t *testing.T) {
zeroVer := version.SemVer{}
require.True(t, zeroVer.IsZero())
ver, err := version.NewSemVer("1.2.3")
require.NoError(t, err)
require.False(t, ver.IsZero())
}
func TestSemVer_Compare(t *testing.T) {
version001, err := version.NewSemVer("v0.0.1")
require.NoError(t, err)
@ -45,24 +71,73 @@ func TestSemVer_Compare(t *testing.T) {
}
func TestRollout_MarshalJSON_UnmarshalJSON(t *testing.T) {
var expectedRollout, actualRollout version.Rollout
var arbitraryRollout version.Rollout
for i := 0; i < len(version.RolloutBytes{}); i++ {
expectedRollout.Seed[i] = byte(i)
expectedRollout.Cursor[i] = byte(i * 2)
arbitraryRollout.Seed[i] = byte(i)
arbitraryRollout.Cursor[i] = byte(i * 2)
}
_, err := json.Marshal(actualRollout.Seed)
require.NoError(t, err)
scenarios := []struct {
name string
rollout version.Rollout
}{
{
"arbitrary rollout",
arbitraryRollout,
},
{
"empty rollout",
version.Rollout{},
},
}
emptyJSONRollout, err := json.Marshal(actualRollout)
require.NoError(t, err)
for _, scenario := range scenarios {
scenario := scenario
t.Run(scenario.name, func(t *testing.T) {
var actualRollout version.Rollout
jsonRollout, err := json.Marshal(expectedRollout)
require.NoError(t, err)
require.NotEqual(t, emptyJSONRollout, jsonRollout)
_, err := json.Marshal(actualRollout.Seed)
require.NoError(t, err)
err = json.Unmarshal(jsonRollout, &actualRollout)
require.NoError(t, err)
require.Equal(t, expectedRollout, actualRollout)
jsonRollout, err := json.Marshal(scenario.rollout)
require.NoError(t, err)
err = json.Unmarshal(jsonRollout, &actualRollout)
require.NoError(t, err)
require.Equal(t, scenario.rollout, actualRollout)
})
}
}
func TestShouldUpdate(t *testing.T) {
// NB: total and acceptable tolerance are negatively correlated.
total := 10000
tolerance := total * 2 / 100 // 2%
for p := 10; p < 100; p += 10 {
var updates int
percentage := p
cursor := version.PercentageToCursor(percentage)
rollout := version.Rollout{
Seed: version.RolloutBytes{},
Cursor: cursor,
}
rand.Read(rollout.Seed[:])
for i := 0; i < total; i++ {
var nodeID storj.NodeID
_, err := rand.Read(nodeID[:])
require.NoError(t, err)
if version.ShouldUpdate(rollout, nodeID) {
updates++
}
}
assert.Condition(t, func() bool {
diff := updates - (total * percentage / 100)
return int(math.Abs(float64(diff))) < tolerance
})
}
}

View File

@ -310,7 +310,7 @@ func cmdVersion(cmd *cobra.Command, args []string) (err error) {
fmt.Println("Development build")
}
if version.Build.Version != (version.SemVer{}) {
if !version.Build.Version.IsZero() {
fmt.Println("Version:", version.Build.Version.String())
}
if !version.Build.Timestamp.IsZero() {

View File

@ -135,8 +135,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
var err error
{
test := version.Info{}
if test != versionInfo {
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}

View File

@ -260,8 +260,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
var err error
{ // setup version control
test := version.Info{}
if test != versionInfo {
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}

View File

@ -661,9 +661,9 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
if err != nil {
return nil, errs.New("unable to convert version to semVer")
}
updateFields.Major = dbx.Node_Major(semVer.Major)
updateFields.Minor = dbx.Node_Minor(semVer.Minor)
updateFields.Patch = dbx.Node_Patch(semVer.Patch)
updateFields.Major = dbx.Node_Major(int64(semVer.Major))
updateFields.Minor = dbx.Node_Minor(int64(semVer.Minor))
updateFields.Patch = dbx.Node_Patch(int64(semVer.Patch))
updateFields.Hash = dbx.Node_Hash(nodeInfo.GetVersion().GetCommitHash())
updateFields.Timestamp = dbx.Node_Timestamp(nodeInfo.GetVersion().Timestamp)
updateFields.Release = dbx.Node_Release(nodeInfo.GetVersion().GetRelease())
@ -954,10 +954,9 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
if err != nil {
return nil, err
}
ver := &version.SemVer{
Major: info.Major,
Minor: info.Minor,
Patch: info.Patch,
ver, err := version.NewSemVer(fmt.Sprintf("%d.%d.%d", info.Major, info.Minor, info.Patch))
if err != nil {
return nil, err
}
exitStatus := overlay.ExitStatus{NodeID: id}

View File

@ -175,8 +175,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
var err error
{
test := version.Info{}
if test != versionInfo {
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}

View File

@ -7,7 +7,6 @@ import (
"context"
"encoding/hex"
"encoding/json"
"math/big"
"net"
"net/http"
"reflect"
@ -33,13 +32,14 @@ var (
// Config is all the configuration parameters for a Version Control Server.
type Config struct {
Address string `user:"true" help:"public address to listen on" default:":8080"`
Versions ServiceVersions
Versions OldVersionConfig
Binary Versions
Binary ProcessesConfig
}
// ServiceVersions provides a list of allowed Versions per Service.
type ServiceVersions struct {
// OldVersionConfig provides a list of allowed Versions per process.
// NB: this will be deprecated in favor of `ProcessesConfig`.
type OldVersionConfig struct {
Satellite string `user:"true" help:"Allowed Satellite Versions" default:"v0.0.1"`
Storagenode string `user:"true" help:"Allowed Storagenode Versions" default:"v0.0.1"`
Uplink string `user:"true" help:"Allowed Uplink Versions" default:"v0.0.1"`
@ -47,32 +47,30 @@ type ServiceVersions struct {
Identity string `user:"true" help:"Allowed Identity Versions" default:"v0.0.1"`
}
// Versions represents versions for all binaries.
// TODO: this name is inconsistent with the internal/version pkg's analogue, `Processes`.
type Versions struct {
Satellite Binary
Storagenode Binary
Uplink Binary
Gateway Binary
Identity Binary
// ProcessesConfig represents versions configuration for all processes.
type ProcessesConfig struct {
Satellite ProcessConfig
Storagenode ProcessConfig
Uplink ProcessConfig
Gateway ProcessConfig
Identity ProcessConfig
}
// Binary represents versions for single binary.
// TODO: This name is inconsistent with the internal/version pkg's analogue, `Process`.
type Binary struct {
Minimum Version
Suggested Version
Rollout Rollout
// ProcessConfig represents versions configuration for a single process.
type ProcessConfig struct {
Minimum VersionConfig
Suggested VersionConfig
Rollout RolloutConfig
}
// Version single version.
type Version struct {
// VersionConfig single version configuration.
type VersionConfig struct {
Version string `user:"true" help:"peer version" default:"v0.0.1"`
URL string `user:"true" help:"URL for specific binary" default:""`
}
// Rollout represents the state of a version rollout of a binary to the suggested version.
type Rollout struct {
// RolloutConfig represents the state of a version rollout configuration of a process.
type RolloutConfig struct {
Seed string `user:"true" help:"random 32 byte, hex-encoded string"`
Cursor int `user:"true" help:"percentage of nodes which should roll-out to the suggested version" default:"0"`
}
@ -126,7 +124,7 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
Log: log,
}
// Convert each Service's Version String to SemVer
// Convert each Service's VersionConfig String to SemVer
peer.Versions.Satellite, err = version.NewSemVer(config.Versions.Satellite)
if err != nil {
return &Peer{}, err
@ -225,12 +223,12 @@ func (peer *Peer) Close() (err error) {
func (peer *Peer) Addr() string { return peer.Server.Listener.Addr().String() }
// ValidateRollouts validates the rollout field of each field in the Versions struct.
func (versions Versions) ValidateRollouts(log *zap.Logger) error {
func (versions ProcessesConfig) ValidateRollouts(log *zap.Logger) error {
value := reflect.ValueOf(versions)
fieldCount := value.NumField()
validationErrs := errs.Group{}
for i := 0; i < fieldCount; i++ {
binary, ok := value.Field(i).Interface().(Binary)
binary, ok := value.Field(i).Interface().(ProcessConfig)
if !ok {
log.Warn("non-binary field in versions config struct", zap.String("field name", value.Type().Field(i).Name))
continue
@ -247,7 +245,7 @@ func (versions Versions) ValidateRollouts(log *zap.Logger) error {
}
// Validate validates the rollout seed and cursor config values.
func (rollout Rollout) Validate() error {
func (rollout RolloutConfig) Validate() error {
seedLen := len(rollout.Seed)
if seedLen == 0 {
return EmptySeedErr
@ -267,23 +265,7 @@ func (rollout Rollout) Validate() error {
return nil
}
func percentageToCursor(pct int) version.RolloutBytes {
// NB: convert the max value to a number, multiply by the percentage, convert back.
var maxInt, maskInt big.Int
var maxBytes version.RolloutBytes
for i := 0; i < len(maxBytes); i++ {
maxBytes[i] = 255
}
maxInt.SetBytes(maxBytes[:])
maskInt.Div(maskInt.Mul(&maxInt, big.NewInt(int64(pct))), big.NewInt(100))
var cursor version.RolloutBytes
copy(cursor[:], maskInt.Bytes())
return cursor
}
func configToProcess(binary Binary) (version.Process, error) {
func configToProcess(binary ProcessConfig) (version.Process, error) {
process := version.Process{
Minimum: version.Version{
Version: binary.Minimum.Version,
@ -294,7 +276,7 @@ func configToProcess(binary Binary) (version.Process, error) {
URL: binary.Suggested.URL,
},
Rollout: version.Rollout{
Cursor: percentageToCursor(binary.Rollout.Cursor),
Cursor: version.PercentageToCursor(binary.Rollout.Cursor),
},
}

View File

@ -17,12 +17,12 @@ import (
var rolloutErrScenarios = []struct {
name string
rollout versioncontrol.Rollout
rollout versioncontrol.RolloutConfig
errContains string
}{
{
"short seed",
versioncontrol.Rollout{
versioncontrol.RolloutConfig{
// 31 byte seed
Seed: "00000000000000000000000000000000000000000000000000000000000000",
Cursor: 0,
@ -31,7 +31,7 @@ var rolloutErrScenarios = []struct {
},
{
"long seed",
versioncontrol.Rollout{
versioncontrol.RolloutConfig{
// 33 byte seed
Seed: "000000000000000000000000000000000000000000000000000000000000000000",
Cursor: 0,
@ -40,7 +40,7 @@ var rolloutErrScenarios = []struct {
},
{
"invalid seed",
versioncontrol.Rollout{
versioncontrol.RolloutConfig{
// non-hex seed
Seed: "G000000000000000000000000000000000000000000000000000000000000000",
Cursor: 0,
@ -49,7 +49,7 @@ var rolloutErrScenarios = []struct {
},
{
"negative cursor",
versioncontrol.Rollout{
versioncontrol.RolloutConfig{
Seed: "0000000000000000000000000000000000000000000000000000000000000000",
Cursor: -1,
},
@ -57,7 +57,7 @@ var rolloutErrScenarios = []struct {
},
{
"cursor too big",
versioncontrol.Rollout{
versioncontrol.RolloutConfig{
Seed: "0000000000000000000000000000000000000000000000000000000000000000",
Cursor: 101,
},
@ -67,7 +67,7 @@ var rolloutErrScenarios = []struct {
func TestPeer_Run(t *testing.T) {
testVersion := "v0.0.1"
testServiceVersions := versioncontrol.ServiceVersions{
testServiceVersions := versioncontrol.OldVersionConfig{
Gateway: testVersion,
Identity: testVersion,
Satellite: testVersion,
@ -89,17 +89,17 @@ func TestPeer_Run(t *testing.T) {
})
t.Run("empty rollout seed", func(t *testing.T) {
versionsType := reflect.TypeOf(versioncontrol.Versions{})
versionsType := reflect.TypeOf(versioncontrol.ProcessesConfig{})
fieldCount := versionsType.NumField()
// test invalid rollout for each binary
for i := 0; i < fieldCount; i++ {
versions := versioncontrol.Versions{}
versions := versioncontrol.ProcessesConfig{}
versionsValue := reflect.ValueOf(&versions)
field := versionsValue.Elem().Field(i)
binary := versioncontrol.Binary{
Rollout: versioncontrol.Rollout{
binary := versioncontrol.ProcessConfig{
Rollout: versioncontrol.RolloutConfig{
Seed: "",
Cursor: 0,
},
@ -123,16 +123,16 @@ func TestPeer_Run_error(t *testing.T) {
for _, scenario := range rolloutErrScenarios {
scenario := scenario
t.Run(scenario.name, func(t *testing.T) {
versionsType := reflect.TypeOf(versioncontrol.Versions{})
versionsType := reflect.TypeOf(versioncontrol.ProcessesConfig{})
fieldCount := versionsType.NumField()
// test invalid rollout for each binary
for i := 0; i < fieldCount; i++ {
versions := versioncontrol.Versions{}
versions := versioncontrol.ProcessesConfig{}
versionsValue := reflect.ValueOf(&versions)
field := reflect.Indirect(versionsValue).Field(i)
binary := versioncontrol.Binary{
binary := versioncontrol.ProcessConfig{
Rollout: scenario.rollout,
}
@ -159,7 +159,7 @@ func TestVersions_ValidateRollouts(t *testing.T) {
func TestRollout_Validate(t *testing.T) {
for i := 0; i < 100; i++ {
rollout := versioncontrol.Rollout{
rollout := versioncontrol.RolloutConfig{
Seed: randSeedString(t),
Cursor: i,
}
@ -181,32 +181,32 @@ func TestRollout_Validate_error(t *testing.T) {
}
}
func validRandVersions(t *testing.T) versioncontrol.Versions {
func validRandVersions(t *testing.T) versioncontrol.ProcessesConfig {
t.Helper()
return versioncontrol.Versions{
Satellite: versioncontrol.Binary{
return versioncontrol.ProcessesConfig{
Satellite: versioncontrol.ProcessConfig{
Rollout: randRollout(t),
},
Storagenode: versioncontrol.Binary{
Storagenode: versioncontrol.ProcessConfig{
Rollout: randRollout(t),
},
Uplink: versioncontrol.Binary{
Uplink: versioncontrol.ProcessConfig{
Rollout: randRollout(t),
},
Gateway: versioncontrol.Binary{
Gateway: versioncontrol.ProcessConfig{
Rollout: randRollout(t),
},
Identity: versioncontrol.Binary{
Identity: versioncontrol.ProcessConfig{
Rollout: randRollout(t),
},
}
}
func randRollout(t *testing.T) versioncontrol.Rollout {
func randRollout(t *testing.T) versioncontrol.RolloutConfig {
t.Helper()
return versioncontrol.Rollout{
return versioncontrol.RolloutConfig{
Seed: randSeedString(t),
Cursor: rand.Intn(101),
}