private/testplanet: support writing monitoring spans
This allows to set `STORJ_TEST_MONKIT` to either `svg` or `json` to write individual testplanet test traces to disk. It also allows to specify an absolute directory: STORJ_TEST_MONKIT=svg,dir:/abs/dir/path It requires an absolute path, because from the context of tests, there's no easy way to find the folder where tests were called. Change-Id: I6fe008a4d4237d221cf5a5bede798b46399ee197
This commit is contained in:
parent
0209bc6ef7
commit
6511bb91fb
5
.gitignore
vendored
5
.gitignore
vendored
@ -35,4 +35,7 @@ resource.syso
|
||||
|
||||
*.resource.go
|
||||
|
||||
.build
|
||||
.build
|
||||
|
||||
# Test debug output
|
||||
*.test.*
|
2
go.mod
2
go.mod
@ -61,7 +61,7 @@ require (
|
||||
gopkg.in/ini.v1 v1.62.0 // indirect
|
||||
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465
|
||||
storj.io/drpc v0.0.26
|
||||
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
|
||||
storj.io/private v0.0.0-20210810102517-434aeab3f17d
|
||||
|
4
go.sum
4
go.sum
@ -879,8 +879,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3
|
||||
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
|
||||
storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66RfXNz19zbyr/uEDVWHJH8dHog=
|
||||
storj.io/common v0.0.0-20210916151047-6aaeb34bb916/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4 h1:G0ePy32obot7DamdE/KcwwKcquE2r6T2pG0G6qyhr6Y=
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465 h1:sEVHROo+9i7+Sl3zuR8OKQnykJ2MmPMsRygjvUkiBns=
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
|
||||
storj.io/drpc v0.0.26 h1:T6jJzjby7QUa/2XHR1qMxTCENpDHEw4/o+kfDfZQqQI=
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@ -30,6 +31,8 @@ import (
|
||||
"storj.io/storj/versioncontrol"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
const defaultInterval = 15 * time.Second
|
||||
|
||||
// Peer represents one of StorageNode or Satellite.
|
||||
@ -180,6 +183,8 @@ func NewCustom(ctx context.Context, log *zap.Logger, config Config, satelliteDat
|
||||
|
||||
// Start starts all the nodes.
|
||||
func (planet *Planet) Start(ctx context.Context) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
planet.cancel = cancel
|
||||
|
||||
@ -234,8 +239,10 @@ func (planet *Planet) StopPeer(peer Peer) error {
|
||||
}
|
||||
|
||||
// StopNodeAndUpdate stops storage node and updates satellite overlay.
|
||||
func (planet *Planet) StopNodeAndUpdate(ctx context.Context, node *StorageNode) error {
|
||||
err := planet.StopPeer(node)
|
||||
func (planet *Planet) StopNodeAndUpdate(ctx context.Context, node *StorageNode) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = planet.StopPeer(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -4,10 +4,18 @@
|
||||
package testplanet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3/collect"
|
||||
"github.com/spacemonkeygo/monkit/v3/present"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/private/dbutil/pgtest"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
@ -31,35 +39,114 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
|
||||
t.Parallel()
|
||||
}
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
if satelliteDB.MasterDB.URL == "" {
|
||||
t.Skipf("Database %s connection string not provided. %s", satelliteDB.MasterDB.Name, satelliteDB.MasterDB.Message)
|
||||
}
|
||||
|
||||
planetConfig := config
|
||||
if planetConfig.Name == "" {
|
||||
planetConfig.Name = t.Name()
|
||||
}
|
||||
|
||||
pprof.Do(ctx, pprof.Labels("planet", planetConfig.Name), func(namedctx context.Context) {
|
||||
planet, err := NewCustom(namedctx, newLogger(t), planetConfig, satelliteDB)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
log := newLogger(t)
|
||||
|
||||
startPlanetAndTest := func(parent context.Context) {
|
||||
ctx := testcontext.NewWithContext(parent, t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
pprof.Do(ctx, pprof.Labels("planet", planetConfig.Name), func(namedctx context.Context) {
|
||||
planet, err := NewCustom(namedctx, log, planetConfig, satelliteDB)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
}
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(namedctx)
|
||||
|
||||
provisionUplinks(namedctx, t, planet)
|
||||
|
||||
test(t, ctx, planet)
|
||||
})
|
||||
}
|
||||
|
||||
monkitConfig := os.Getenv("STORJ_TEST_MONKIT")
|
||||
if monkitConfig == "" {
|
||||
startPlanetAndTest(context.Background())
|
||||
} else {
|
||||
flags := parseMonkitFlags(monkitConfig)
|
||||
outDir := flags["dir"]
|
||||
if outDir != "" {
|
||||
if !filepath.IsAbs(outDir) {
|
||||
t.Fatalf("testplanet-monkit: dir must be an absolute path, but was %q", outDir)
|
||||
}
|
||||
}
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
outType := flags["type"]
|
||||
|
||||
planet.Start(namedctx)
|
||||
rootctx := context.Background()
|
||||
|
||||
provisionUplinks(namedctx, t, planet)
|
||||
done := mon.Task()(&rootctx)
|
||||
spans := collect.CollectSpans(rootctx, startPlanetAndTest)
|
||||
done(nil)
|
||||
|
||||
test(t, ctx, planet)
|
||||
})
|
||||
outPath := filepath.Join(outDir, sanitizeFileName(planetConfig.Name))
|
||||
var data bytes.Buffer
|
||||
|
||||
switch outType {
|
||||
default: // also svg
|
||||
if outType != "svg" {
|
||||
t.Logf("testplanet-monkit: unknown output type %q defaulting to svg", outType)
|
||||
}
|
||||
outPath += ".test.svg"
|
||||
err := present.SpansToSVG(&data, spans)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
case "json":
|
||||
outPath += ".test.json"
|
||||
err := present.SpansToJSON(&data, spans)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := os.WriteFile(outPath, data.Bytes(), 0644)
|
||||
if err != nil {
|
||||
log.Error("failed to write svg", zap.String("path", outPath), zap.Error(err))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func parseMonkitFlags(s string) map[string]string {
|
||||
r := make(map[string]string)
|
||||
for _, tag := range strings.Split(s, ",") {
|
||||
tokens := strings.SplitN(tag, "=", 2)
|
||||
if len(tokens) <= 1 {
|
||||
r["type"] = strings.TrimSpace(tag)
|
||||
continue
|
||||
}
|
||||
key, value := strings.TrimSpace(tokens[0]), strings.TrimSpace(tokens[1])
|
||||
r[key] = value
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func sanitizeFileName(s string) string {
|
||||
var b strings.Builder
|
||||
for _, x := range s {
|
||||
switch {
|
||||
case 'a' <= x && x <= 'z':
|
||||
b.WriteRune(x)
|
||||
case 'A' <= x && x <= 'Z':
|
||||
b.WriteRune(x)
|
||||
case '0' <= x && x <= '9':
|
||||
b.WriteRune(x)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func provisionUplinks(ctx context.Context, t *testing.T, planet *Planet) {
|
||||
for _, planetUplink := range planet.Uplinks {
|
||||
for _, satellite := range planet.Satellites {
|
||||
|
@ -213,7 +213,9 @@ func (system *Satellite) NodeURL() storj.NodeURL {
|
||||
|
||||
// AddUser adds user to a satellite. Password from newUser will be always overridden by FullName to have
|
||||
// known password which can be used automatically.
|
||||
func (system *Satellite) AddUser(ctx context.Context, newUser console.CreateUser, maxNumberOfProjects int) (*console.User, error) {
|
||||
func (system *Satellite) AddUser(ctx context.Context, newUser console.CreateUser, maxNumberOfProjects int) (_ *console.User, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
regToken, err := system.API.Console.Service.CreateRegToken(ctx, maxNumberOfProjects)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -248,7 +250,9 @@ func (system *Satellite) AddUser(ctx context.Context, newUser console.CreateUser
|
||||
}
|
||||
|
||||
// AddProject adds project to a satellite and makes specified user an owner.
|
||||
func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name string) (*console.Project, error) {
|
||||
func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name string) (_ *console.Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
authCtx, err := system.AuthenticatedContext(ctx, ownerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -263,7 +267,9 @@ func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name
|
||||
}
|
||||
|
||||
// AuthenticatedContext creates context with authentication date for given user.
|
||||
func (system *Satellite) AuthenticatedContext(ctx context.Context, userID uuid.UUID) (context.Context, error) {
|
||||
func (system *Satellite) AuthenticatedContext(ctx context.Context, userID uuid.UUID) (_ context.Context, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
user, err := system.API.Console.Service.GetUser(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -328,7 +334,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
|
||||
func (system *Satellite) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
|
||||
|
||||
// newSatellites initializes satellites.
|
||||
func (planet *Planet) newSatellites(ctx context.Context, count int, databases satellitedbtest.SatelliteDatabases) ([]*Satellite, error) {
|
||||
func (planet *Planet) newSatellites(ctx context.Context, count int, databases satellitedbtest.SatelliteDatabases) (_ []*Satellite, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var satellites []*Satellite
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
@ -354,7 +362,9 @@ func (planet *Planet) newSatellites(ctx context.Context, count int, databases sa
|
||||
return satellites, nil
|
||||
}
|
||||
|
||||
func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int, log *zap.Logger, databases satellitedbtest.SatelliteDatabases) (*Satellite, error) {
|
||||
func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int, log *zap.Logger, databases satellitedbtest.SatelliteDatabases) (_ *Satellite, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
storageDir := filepath.Join(planet.directory, prefix)
|
||||
if err := os.MkdirAll(storageDir, 0700); err != nil {
|
||||
return nil, err
|
||||
@ -613,10 +623,11 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
||||
return system
|
||||
}
|
||||
|
||||
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
|
||||
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.API, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "satellite-api" + strconv.Itoa(index)
|
||||
log := planet.log.Named(prefix)
|
||||
var err error
|
||||
|
||||
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
|
||||
if err != nil {
|
||||
@ -636,14 +647,18 @@ func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.
|
||||
return satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
|
||||
}
|
||||
|
||||
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (*satellite.Admin, error) {
|
||||
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Admin, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "satellite-admin" + strconv.Itoa(index)
|
||||
log := planet.log.Named(prefix)
|
||||
|
||||
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
|
||||
}
|
||||
|
||||
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
|
||||
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Repairer, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "satellite-repairer" + strconv.Itoa(index)
|
||||
log := planet.log.Named(prefix)
|
||||
|
||||
@ -667,7 +682,9 @@ func (cache rollupsWriteCacheCloser) Close() error {
|
||||
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
|
||||
}
|
||||
|
||||
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
|
||||
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollection, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "satellite-gc" + strconv.Itoa(index)
|
||||
log := planet.log.Named(prefix)
|
||||
|
||||
|
@ -12,7 +12,9 @@ import (
|
||||
|
||||
// WaitForStorageNodeEndpoints waits for storage node endpoints to finish their work.
|
||||
// The call will return an error if they have not been completed after 1 minute.
|
||||
func (planet *Planet) WaitForStorageNodeEndpoints(ctx context.Context) error {
|
||||
func (planet *Planet) WaitForStorageNodeEndpoints(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
timeout := time.NewTimer(time.Minute)
|
||||
defer timeout.Stop()
|
||||
for {
|
||||
@ -40,6 +42,8 @@ func (planet *Planet) storageNodeLiveRequestCount() int {
|
||||
|
||||
// WaitForStorageNodeDeleters calls the Wait method on each storagenode's PieceDeleter.
|
||||
func (planet *Planet) WaitForStorageNodeDeleters(ctx context.Context) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
|
@ -61,7 +61,9 @@ func (system *StorageNode) NodeURL() storj.NodeURL {
|
||||
}
|
||||
|
||||
// newStorageNodes initializes storage nodes.
|
||||
func (planet *Planet) newStorageNodes(ctx context.Context, count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) {
|
||||
func (planet *Planet) newStorageNodes(ctx context.Context, count int, whitelistedSatellites storj.NodeURLs) (_ []*StorageNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var sources []trust.Source
|
||||
for _, u := range whitelistedSatellites {
|
||||
source, err := trust.NewStaticURLSource(u.String())
|
||||
@ -93,7 +95,9 @@ func (planet *Planet) newStorageNodes(ctx context.Context, count int, whiteliste
|
||||
return xs, nil
|
||||
}
|
||||
|
||||
func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index, count int, log *zap.Logger, sources []trust.Source) (*StorageNode, error) {
|
||||
func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index, count int, log *zap.Logger, sources []trust.Source) (_ *StorageNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
storageDir := filepath.Join(planet.directory, prefix)
|
||||
if err := os.MkdirAll(storageDir, 0700); err != nil {
|
||||
return nil, err
|
||||
|
@ -71,12 +71,15 @@ type UserLogin struct {
|
||||
}
|
||||
|
||||
// DialMetainfo dials the satellite with the appropriate api key.
|
||||
func (project *Project) DialMetainfo(ctx context.Context) (*metaclient.Client, error) {
|
||||
func (project *Project) DialMetainfo(ctx context.Context) (_ *metaclient.Client, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return project.client.DialMetainfo(ctx, project.Satellite, project.RawAPIKey)
|
||||
}
|
||||
|
||||
// newUplinks creates initializes uplinks, requires peer to have at least one satellite.
|
||||
func (planet *Planet) newUplinks(ctx context.Context, prefix string, count int) ([]*Uplink, error) {
|
||||
func (planet *Planet) newUplinks(ctx context.Context, prefix string, count int) (_ []*Uplink, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var xs []*Uplink
|
||||
for i := 0; i < count; i++ {
|
||||
name := prefix + strconv.Itoa(i)
|
||||
@ -95,7 +98,9 @@ func (planet *Planet) newUplinks(ctx context.Context, prefix string, count int)
|
||||
}
|
||||
|
||||
// newUplink creates a new uplink.
|
||||
func (planet *Planet) newUplink(ctx context.Context, name string) (*Uplink, error) {
|
||||
func (planet *Planet) newUplink(ctx context.Context, name string) (_ *Uplink, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
identity, err := planet.NewIdentity()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -184,32 +189,37 @@ func (client *Uplink) Addr() string { return "" }
|
||||
func (client *Uplink) Shutdown() error { return nil }
|
||||
|
||||
// DialMetainfo dials destination with apikey and returns metainfo Client.
|
||||
func (client *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey *macaroon.APIKey) (*metaclient.Client, error) {
|
||||
func (client *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey *macaroon.APIKey) (_ *metaclient.Client, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return metaclient.DialNodeURL(ctx, client.Dialer, destination.NodeURL().String(), apikey, "Test/1.0")
|
||||
}
|
||||
|
||||
// DialPiecestore dials destination storagenode and returns a piecestore client.
|
||||
func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*piecestore.Client, error) {
|
||||
func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (_ *piecestore.Client, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return piecestore.Dial(ctx, client.Dialer, destination.NodeURL(), piecestore.DefaultConfig)
|
||||
}
|
||||
|
||||
// OpenProject opens project with predefined access grant and gives access to pure uplink API.
|
||||
func (client *Uplink) OpenProject(ctx context.Context, satellite *Satellite) (*uplink.Project, error) {
|
||||
func (client *Uplink) OpenProject(ctx context.Context, satellite *Satellite) (_ *uplink.Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, found := testuplink.GetMaxSegmentSize(ctx)
|
||||
if !found {
|
||||
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
|
||||
}
|
||||
|
||||
return uplink.OpenProject(ctx, client.Access[satellite.ID()])
|
||||
}
|
||||
|
||||
// Upload data to specific satellite.
|
||||
func (client *Uplink) Upload(ctx context.Context, satellite *Satellite, bucket string, path storj.Path, data []byte) error {
|
||||
func (client *Uplink) Upload(ctx context.Context, satellite *Satellite, bucket string, path storj.Path, data []byte) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
|
||||
}
|
||||
|
||||
// UploadWithExpiration data to specific satellite and expiration time.
|
||||
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, data []byte, expiration time.Time) error {
|
||||
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, data []byte, expiration time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, found := testuplink.GetMaxSegmentSize(ctx)
|
||||
if !found {
|
||||
ctx = testuplink.WithMaxSegmentSize(ctx, satellite.Config.Metainfo.MaxSegmentSize)
|
||||
@ -244,7 +254,9 @@ func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satel
|
||||
}
|
||||
|
||||
// Download data from specific satellite.
|
||||
func (client *Uplink) Download(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) ([]byte, error) {
|
||||
func (client *Uplink) Download(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) (_ []byte, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -266,6 +278,8 @@ func (client *Uplink) Download(ctx context.Context, satellite *Satellite, bucket
|
||||
|
||||
// DownloadStream returns stream for downloading data.
|
||||
func (client *Uplink) DownloadStream(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -284,6 +298,8 @@ func (client *Uplink) DownloadStream(ctx context.Context, satellite *Satellite,
|
||||
|
||||
// DownloadStreamRange returns stream for downloading data.
|
||||
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -304,7 +320,9 @@ func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *Satell
|
||||
}
|
||||
|
||||
// DeleteObject deletes an object at the path in a bucket.
|
||||
func (client *Uplink) DeleteObject(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) error {
|
||||
func (client *Uplink) DeleteObject(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -319,7 +337,9 @@ func (client *Uplink) DeleteObject(ctx context.Context, satellite *Satellite, bu
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket.
|
||||
func (client *Uplink) CreateBucket(ctx context.Context, satellite *Satellite, bucketName string) error {
|
||||
func (client *Uplink) CreateBucket(ctx context.Context, satellite *Satellite, bucketName string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -334,7 +354,9 @@ func (client *Uplink) CreateBucket(ctx context.Context, satellite *Satellite, bu
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket.
|
||||
func (client *Uplink) DeleteBucket(ctx context.Context, satellite *Satellite, bucketName string) error {
|
||||
func (client *Uplink) DeleteBucket(ctx context.Context, satellite *Satellite, bucketName string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -349,7 +371,9 @@ func (client *Uplink) DeleteBucket(ctx context.Context, satellite *Satellite, bu
|
||||
}
|
||||
|
||||
// ListBuckets returns a list of all buckets in a project.
|
||||
func (client *Uplink) ListBuckets(ctx context.Context, satellite *Satellite) ([]*uplink.Bucket, error) {
|
||||
func (client *Uplink) ListBuckets(ctx context.Context, satellite *Satellite) (_ []*uplink.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var buckets = []*uplink.Bucket{}
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
@ -365,7 +389,9 @@ func (client *Uplink) ListBuckets(ctx context.Context, satellite *Satellite) ([]
|
||||
}
|
||||
|
||||
// ListObjects returns a list of all objects in a bucket.
|
||||
func (client *Uplink) ListObjects(ctx context.Context, satellite *Satellite, bucketName string) ([]*uplink.Object, error) {
|
||||
func (client *Uplink) ListObjects(ctx context.Context, satellite *Satellite, bucketName string) (_ []*uplink.Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var objects = []*uplink.Object{}
|
||||
project, err := client.GetProject(ctx, satellite)
|
||||
if err != nil {
|
||||
@ -381,7 +407,9 @@ func (client *Uplink) ListObjects(ctx context.Context, satellite *Satellite, buc
|
||||
}
|
||||
|
||||
// GetProject returns a uplink.Project which allows interactions with a specific project.
|
||||
func (client *Uplink) GetProject(ctx context.Context, satellite *Satellite) (*uplink.Project, error) {
|
||||
func (client *Uplink) GetProject(ctx context.Context, satellite *Satellite) (_ *uplink.Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
access := client.Access[satellite.ID()]
|
||||
|
||||
project, err := client.Config.OpenProject(ctx, access)
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
|
||||
// newVersionControlServer initializes the Versioning Server.
|
||||
func (planet *Planet) newVersionControlServer() (peer *versioncontrol.Peer, err error) {
|
||||
|
||||
prefix := "versioncontrol"
|
||||
log := planet.log.Named(prefix)
|
||||
dbDir := filepath.Join(planet.directory, prefix)
|
||||
|
@ -8,6 +8,6 @@ require (
|
||||
github.com/go-rod/rod v0.100.0
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/zap v1.17.0
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465
|
||||
storj.io/storj v0.12.1-0.20210916114455-b2d724962c24
|
||||
)
|
||||
|
@ -841,8 +841,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3
|
||||
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
|
||||
storj.io/common v0.0.0-20210805073808-8e0feb09e92a/go.mod h1:mhZYWpTojKsACxWE66RfXNz19zbyr/uEDVWHJH8dHog=
|
||||
storj.io/common v0.0.0-20210916151047-6aaeb34bb916/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4 h1:G0ePy32obot7DamdE/KcwwKcquE2r6T2pG0G6qyhr6Y=
|
||||
storj.io/common v0.0.0-20210928125533-ecbc7f49b8a4/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465 h1:sEVHROo+9i7+Sl3zuR8OKQnykJ2MmPMsRygjvUkiBns=
|
||||
storj.io/common v0.0.0-20210928143209-230bee624465/go.mod h1:objobGrIWQwhmTSpSm6Y7ykd40wZjB7CezNfic5YLKg=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
|
||||
storj.io/drpc v0.0.26 h1:T6jJzjby7QUa/2XHR1qMxTCENpDHEw4/o+kfDfZQqQI=
|
||||
|
Loading…
Reference in New Issue
Block a user