certificate/authorization: implement gob to pb migration
Change-Id: I6f33f5802e3f0a3c8a5f0c3cff61ef836a645c41
This commit is contained in:
parent
10c552fec4
commit
d0686648db
@ -127,9 +127,13 @@ func ParseToken(tokenString string) (*Token, error) {
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isGobEncoded(data []byte) bool {
|
||||||
|
return bytes.HasPrefix(data, []byte{0x14, 0xff, 0xb3, 0x2, 0x1, 0x1, 0x5, 0x47, 0x72})
|
||||||
|
}
|
||||||
|
|
||||||
// Unmarshal deserializes a set of authorizations.
|
// Unmarshal deserializes a set of authorizations.
|
||||||
func (group *Group) Unmarshal(data []byte) error {
|
func (group *Group) Unmarshal(data []byte) error {
|
||||||
if bytes.HasPrefix(data, []byte{0x14, 0xff, 0xb3, 0x2, 0x1, 0x1, 0x5, 0x47, 0x72}) {
|
if isGobEncoded(data) {
|
||||||
decoder := gob.NewDecoder(bytes.NewBuffer(data))
|
decoder := gob.NewDecoder(bytes.NewBuffer(data))
|
||||||
if err := decoder.Decode(group); err != nil {
|
if err := decoder.Decode(group); err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
|
@ -296,3 +296,41 @@ func (authDB *DB) put(ctx context.Context, userID string, auths Group) (err erro
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MigrateGob migrates gob encoded Group to protobuf encoded Group.
|
||||||
|
func (authDB *DB) MigrateGob(ctx context.Context, progress func(userID string)) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
err = authDB.db.Iterate(ctx, storage.IterateOptions{
|
||||||
|
Recurse: true,
|
||||||
|
}, func(ctx context.Context, it storage.Iterator) error {
|
||||||
|
var item storage.ListItem
|
||||||
|
|
||||||
|
for it.Next(ctx, &item) {
|
||||||
|
if !isGobEncoded(item.Value) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if progress != nil {
|
||||||
|
progress(string(item.Key))
|
||||||
|
}
|
||||||
|
|
||||||
|
var group Group
|
||||||
|
if err := group.Unmarshal(item.Value); err != nil {
|
||||||
|
return ErrDBInternal.New("unmarshal failed key=%q: %w", item.Key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newValue, err := group.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return ErrDBInternal.New("re-marshal failed key=%q: %w", item.Key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = authDB.db.CompareAndSwap(ctx, item.Key, item.Value, newValue)
|
||||||
|
if err != nil {
|
||||||
|
return ErrDBInternal.New("updating %q failed: %w", item.Key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return ErrDBInternal.Wrap(err)
|
||||||
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"storj.io/common/rpc/rpcpeer"
|
"storj.io/common/rpc/rpcpeer"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
"storj.io/storj/certificate/certificatepb"
|
"storj.io/storj/certificate/certificatepb"
|
||||||
|
"storj.io/storj/private/testredis"
|
||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -414,3 +415,39 @@ func newTestAuthDB(t *testing.T, ctx *testcontext.Context) *DB {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMigrateGob_Redis(t *testing.T) {
|
||||||
|
ctx := testcontext.New(t)
|
||||||
|
|
||||||
|
server, err := testredis.Start(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ctx.Check(server.Close)
|
||||||
|
|
||||||
|
db, err := OpenDB(ctx, "redis://"+server.Addr()+"?db=1", true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer ctx.Check(db.Close)
|
||||||
|
|
||||||
|
require.NoError(t, db.db.Put(ctx, storage.Key("gob"), expectedGroupDataGob))
|
||||||
|
require.NoError(t, db.db.Put(ctx, storage.Key("pb"), expectedGroupDataProto))
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
err = db.MigrateGob(ctx, func(userID string) {
|
||||||
|
count++
|
||||||
|
t.Log("migrating", userID)
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, count)
|
||||||
|
|
||||||
|
data, err := db.db.Get(ctx, storage.Key("gob"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, isGobEncoded(data))
|
||||||
|
require.Equal(t, expectedGroupDataProto, []byte(data))
|
||||||
|
|
||||||
|
data, err = db.db.Get(ctx, storage.Key("pb"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedGroupDataProto, []byte(data))
|
||||||
|
}
|
||||||
|
@ -99,6 +99,7 @@ func main() {
|
|||||||
|
|
||||||
rootCmd.AddCommand(authCmd)
|
rootCmd.AddCommand(authCmd)
|
||||||
rootCmd.AddCommand(runCmd)
|
rootCmd.AddCommand(runCmd)
|
||||||
|
rootCmd.AddCommand(migrateCmd)
|
||||||
rootCmd.AddCommand(setupCmd)
|
rootCmd.AddCommand(setupCmd)
|
||||||
rootCmd.AddCommand(signCmd)
|
rootCmd.AddCommand(signCmd)
|
||||||
rootCmd.AddCommand(verifyCmd)
|
rootCmd.AddCommand(verifyCmd)
|
||||||
@ -113,6 +114,7 @@ func main() {
|
|||||||
process.Bind(authInfoCmd, &authCfg, defaults, cfgstruct.ConfDir(confDir))
|
process.Bind(authInfoCmd, &authCfg, defaults, cfgstruct.ConfDir(confDir))
|
||||||
process.Bind(authExportCmd, &authCfg, defaults, cfgstruct.ConfDir(confDir))
|
process.Bind(authExportCmd, &authCfg, defaults, cfgstruct.ConfDir(confDir))
|
||||||
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
|
process.Bind(migrateCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
|
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
|
||||||
process.Bind(signCmd, &signCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(signCmd, &signCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(verifyCmd, &verifyCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(verifyCmd, &verifyCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
|
42
cmd/certificates/migrate.go
Normal file
42
cmd/certificates/migrate.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/private/process"
|
||||||
|
"storj.io/storj/certificate/authorization"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
migrateCmd = &cobra.Command{
|
||||||
|
Use: "migrate-gob",
|
||||||
|
Short: "Migrate from gob encoding to protobuf encoding",
|
||||||
|
RunE: cmdMigrate,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func cmdMigrate(cmd *cobra.Command, args []string) error {
|
||||||
|
ctx, _ := process.Ctx(cmd)
|
||||||
|
|
||||||
|
authorizationDB, err := authorization.OpenDBFromCfg(ctx, runCfg.AuthorizationDB)
|
||||||
|
if err != nil {
|
||||||
|
return errs.New("error opening authorizations database: %+v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, authorizationDB.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
log := zap.L()
|
||||||
|
count := 0
|
||||||
|
return authorizationDB.MigrateGob(ctx, func(userID string) {
|
||||||
|
if count%100 == 0 {
|
||||||
|
log.Info("progress", zap.String("user", userID), zap.Int("count", count))
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user