versioncontrol: add process url resolver

Change-Id: Iddd0d6cd976e10cbed3def85af72259d3c14e717
This commit is contained in:
Yaroslav Vorobiov 2020-12-03 04:14:43 +02:00 committed by paul cannon
parent 678b07b314
commit 1ed7227521
3 changed files with 282 additions and 25 deletions

View File

@ -0,0 +1,40 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package versioncontrol
// SupportedBinaries list of supported binary schemes.
var SupportedBinaries = []string{
"identity_darwin_amd64",
"identity_freebsd_amd64",
"identity_linux_amd64",
"identity_linux_arm",
"identity_linux_arm64",
"identity_windows_amd64",
"storagenode-updater_linux_amd64",
"storagenode-updater_linux_arm",
"storagenode-updater_linux_arm64",
"storagenode-updater_windows_amd64",
"storagenode_freebsd_amd64",
"storagenode_linux_amd64",
"storagenode_linux_arm",
"storagenode_linux_arm64",
"storagenode_windows_amd64",
"uplink_darwin_amd64",
"uplink_freebsd_amd64",
"uplink_linux_amd64",
"uplink_linux_arm",
"uplink_linux_arm64",
"uplink_windows_amd64",
}
// isBinarySupported check if binary scheme matching provided service, os and arch is supported.
func isBinarySupported(service, os, arch string) (string, bool) {
binary := service + "_" + os + "_" + arch
for _, supportedBinary := range SupportedBinaries {
if binary == supportedBinary {
return binary, true
}
}
return binary, false
}

View File

@ -8,10 +8,13 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net" "net"
"net/http" "net/http"
"reflect" "reflect"
"strings"
"github.com/gorilla/mux"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -90,27 +93,13 @@ type Peer struct {
Endpoint http.Server Endpoint http.Server
Listener net.Listener Listener net.Listener
} }
Versions version.AllowedVersions Versions version.AllowedVersions
// response contains the byte version of current allowed versions // response contains the byte version of current allowed versions
response []byte response []byte
} }
// HandleGet contains the request handler for the version control web server.
func (peer *Peer) HandleGet(w http.ResponseWriter, r *http.Request) {
// Only handle GET Requests
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
_, err := w.Write(peer.response)
if err != nil {
peer.Log.Error("Error writing response to client.", zap.Error(err))
}
}
// New creates a new VersionControl Server. // New creates a new VersionControl Server.
func New(log *zap.Logger, config *Config) (peer *Peer, err error) { func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
if err := config.Binary.ValidateRollouts(log); err != nil { if err := config.Binary.ValidateRollouts(log); err != nil {
@ -147,7 +136,6 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
return &Peer{}, err return &Peer{}, err
} }
peer.Versions.Processes = version.Processes{}
peer.Versions.Processes.Satellite, err = configToProcess(config.Binary.Satellite) peer.Versions.Processes.Satellite, err = configToProcess(config.Binary.Satellite)
if err != nil { if err != nil {
return nil, RolloutErr.Wrap(err) return nil, RolloutErr.Wrap(err)
@ -186,22 +174,101 @@ func New(log *zap.Logger, config *Config) (peer *Peer, err error) {
peer.Log.Debug("Setting version info.", zap.ByteString("Value", peer.response)) peer.Log.Debug("Setting version info.", zap.ByteString("Value", peer.response))
mux := http.NewServeMux() {
mux.HandleFunc("/", peer.HandleGet) router := mux.NewRouter()
peer.Server.Endpoint = http.Server{ router.HandleFunc("/", peer.versionHandle).Methods(http.MethodGet)
Handler: mux, router.HandleFunc("/processes/{service}/{version}/url", peer.processURLHandle).Methods(http.MethodGet)
peer.Server.Endpoint = http.Server{
Handler: router,
}
peer.Server.Listener, err = net.Listen("tcp", config.Address)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
} }
peer.Server.Listener, err = net.Listen("tcp", config.Address)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
return peer, nil return peer, nil
} }
// versionHandle handles all process versions request.
func (peer *Peer) versionHandle(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, err := w.Write(peer.response)
if err != nil {
peer.Log.Error("Error writing response to client.", zap.Error(err))
}
}
// processURLHandle handles process binary url resolving.
func (peer *Peer) processURLHandle(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
service := params["service"]
versionType := params["version"]
var process version.Process
switch service {
case "satellite":
process = peer.Versions.Processes.Satellite
case "storagenode":
process = peer.Versions.Processes.Storagenode
case "storagenode-updater":
process = peer.Versions.Processes.StoragenodeUpdater
case "uplink":
process = peer.Versions.Processes.Uplink
case "gateway":
process = peer.Versions.Processes.Gateway
case "identity":
process = peer.Versions.Processes.Identity
default:
http.Error(w, "service does not exists", http.StatusNotFound)
return
}
var url string
switch versionType {
case "minimum":
url = process.Minimum.URL
case "suggested":
url = process.Suggested.URL
default:
http.Error(w, "invalid version, should be minimum or suggested", http.StatusBadRequest)
return
}
query := r.URL.Query()
os := query.Get("os")
if os == "" {
http.Error(w, "goos is not specified", http.StatusBadRequest)
return
}
arch := query.Get("arch")
if arch == "" {
http.Error(w, "goarch is not specified", http.StatusBadRequest)
return
}
if scheme, ok := isBinarySupported(service, os, arch); !ok {
http.Error(w, fmt.Sprintf("binary scheme %s is not supported", scheme), http.StatusNotFound)
return
}
url = strings.Replace(url, "{os}", os, 1)
url = strings.Replace(url, "{arch}", arch, 1)
w.Header().Set("Content-Type", "text/plain")
_, err := w.Write([]byte(url))
if err != nil {
peer.Log.Error("Error writing response to client.", zap.Error(err))
}
}
// Run runs versioncontrol server until it's either closed or it errors. // Run runs versioncontrol server until it's either closed or it errors.
func (peer *Peer) Run(ctx context.Context) (err error) { func (peer *Peer) Run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
var group errgroup.Group var group errgroup.Group

View File

@ -4,14 +4,20 @@
package versioncontrol_test package versioncontrol_test
import ( import (
"context"
"encoding/hex" "encoding/hex"
"io/ioutil"
"math/rand" "math/rand"
"net/http"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"storj.io/common/testcontext"
"storj.io/storj/versioncontrol" "storj.io/storj/versioncontrol"
) )
@ -65,6 +71,150 @@ var rolloutErrScenarios = []struct {
}, },
} }
func TestPeerEndpoint(t *testing.T) {
minimumVersion := "v0.0.1"
suggestedVersion := "v0.0.2"
createURL := func(process, version string) string {
urlTmpl := "http://example.com/{version}/{process}_{os}_{arch}"
url := strings.Replace(urlTmpl, "{version}", version, 1)
url = strings.Replace(url, "{process}", process, 1)
return url
}
config := &versioncontrol.Config{
Address: "127.0.0.1:0",
Versions: versioncontrol.OldVersionConfig{
Satellite: minimumVersion,
Storagenode: minimumVersion,
Uplink: minimumVersion,
Gateway: minimumVersion,
Identity: minimumVersion,
},
Binary: versioncontrol.ProcessesConfig{
Storagenode: versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: minimumVersion,
URL: createURL("storagenode", minimumVersion),
},
Suggested: versioncontrol.VersionConfig{
Version: suggestedVersion,
URL: createURL("storagenode", suggestedVersion),
},
},
StoragenodeUpdater: versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: minimumVersion,
URL: createURL("storagenode-updater", minimumVersion),
},
Suggested: versioncontrol.VersionConfig{
Version: suggestedVersion,
URL: createURL("storagenode-updater", suggestedVersion),
},
},
Uplink: versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: minimumVersion,
URL: createURL("uplink", minimumVersion),
},
Suggested: versioncontrol.VersionConfig{
Version: suggestedVersion,
URL: createURL("uplink", suggestedVersion),
},
},
Gateway: versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: minimumVersion,
URL: createURL("gateway", minimumVersion),
},
Suggested: versioncontrol.VersionConfig{
Version: suggestedVersion,
URL: createURL("gateway", suggestedVersion),
},
},
Identity: versioncontrol.ProcessConfig{
Minimum: versioncontrol.VersionConfig{
Version: minimumVersion,
URL: createURL("identity", minimumVersion),
},
Suggested: versioncontrol.VersionConfig{
Version: suggestedVersion,
URL: createURL("identity", suggestedVersion),
},
},
},
}
log := zaptest.NewLogger(t)
peer, err := versioncontrol.New(log, config)
require.NoError(t, err)
require.NotNil(t, peer)
testCtx := testcontext.New(t)
ctx, cancel := context.WithCancel(testCtx)
var wg errgroup.Group
wg.Go(func() error {
return peer.Run(ctx)
})
defer testCtx.Check(peer.Close)
defer cancel()
baseURL := "http://" + peer.Addr()
t.Run("resolve process url", func(t *testing.T) {
queryTmpl := "processes/{service}/{version}/url?os={os}&arch={arch}"
urls := make(map[string]string)
for _, supportedBinary := range versioncontrol.SupportedBinaries {
splitted := strings.SplitN(supportedBinary, "_", 3)
service := splitted[0]
os := splitted[1]
arch := splitted[2]
for _, versionType := range []string{"minimum", "suggested"} {
query := strings.Replace(queryTmpl, "{service}", service, 1)
query = strings.Replace(query, "{version}", versionType, 1)
query = strings.Replace(query, "{os}", os, 1)
query = strings.Replace(query, "{arch}", arch, 1)
var url string
switch versionType {
case "minimum":
url = createURL(service, minimumVersion)
case "suggested":
url = createURL(service, suggestedVersion)
}
url = strings.Replace(url, "{os}", os, 1)
url = strings.Replace(url, "{arch}", arch, 1)
urls[query] = url
}
}
for query, url := range urls {
query, url := query, url
t.Run(query, func(t *testing.T) {
resp, err := http.Get(baseURL + "/" + query)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, b)
require.NoError(t, resp.Body.Close())
require.Equal(t, url, string(b))
log.Debug(string(b))
})
}
})
}
func TestPeer_Run(t *testing.T) { func TestPeer_Run(t *testing.T) {
testVersion := "v0.0.1" testVersion := "v0.0.1"
testServiceVersions := versioncontrol.OldVersionConfig{ testServiceVersions := versioncontrol.OldVersionConfig{