storagenode/db: preflight improve index discovery

Change-Id: I876b321f6cd4e91dfced87aa4d39f2cf9a8e63d0
This commit is contained in:
Yaroslav Vorobiov 2020-06-02 17:15:59 +03:00
parent 9a04ca0527
commit 09ca382abf
4 changed files with 113 additions and 30 deletions

View File

@ -136,7 +136,7 @@ func (schema *Schema) EnsureTable(tableName string) *Table {
return table return table
} }
// DropTable removes the specified table // DropTable removes the specified table.
func (schema *Schema) DropTable(tableName string) { func (schema *Schema) DropTable(tableName string) {
for i, table := range schema.Tables { for i, table := range schema.Tables {
if table.Name == tableName { if table.Name == tableName {
@ -156,12 +156,33 @@ func (schema *Schema) DropTable(tableName string) {
schema.Indexes = schema.Indexes[:j:j] schema.Indexes = schema.Indexes[:j:j]
} }
// FindIndex finds index in the schema.
func (schema *Schema) FindIndex(name string) (*Index, bool) {
for _, idx := range schema.Indexes {
if idx.Name == name {
return idx, true
}
}
return nil, false
}
// DropIndex removes the specified index.
func (schema *Schema) DropIndex(name string) {
for i, idx := range schema.Indexes {
if idx.Name == name {
schema.Indexes = append(schema.Indexes[:i], schema.Indexes[i+1:]...)
return
}
}
}
// AddColumn adds the column to the table. // AddColumn adds the column to the table.
func (table *Table) AddColumn(column *Column) { func (table *Table) AddColumn(column *Column) {
table.Columns = append(table.Columns, column) table.Columns = append(table.Columns, column)
} }
// FindColumn finds a column in the table // FindColumn finds a column in the table.
func (table *Table) FindColumn(columnName string) (*Column, bool) { func (table *Table) FindColumn(columnName string) (*Column, bool) {
for _, column := range table.Columns { for _, column := range table.Columns {
if column.Name == columnName { if column.Name == columnName {
@ -171,7 +192,7 @@ func (table *Table) FindColumn(columnName string) (*Column, bool) {
return nil, false return nil, false
} }
// ColumnNames returns column names // ColumnNames returns column names.
func (table *Table) ColumnNames() []string { func (table *Table) ColumnNames() []string {
columns := make([]string, len(table.Columns)) columns := make([]string, len(table.Columns))
for i, column := range table.Columns { for i, column := range table.Columns {
@ -180,7 +201,7 @@ func (table *Table) ColumnNames() []string {
return columns return columns
} }
// Sort sorts tables and indexes // Sort sorts tables and indexes.
func (schema *Schema) Sort() { func (schema *Schema) Sort() {
sort.Slice(schema.Tables, func(i, k int) bool { sort.Slice(schema.Tables, func(i, k int) bool {
return schema.Tables[i].Name < schema.Tables[k].Name return schema.Tables[i].Name < schema.Tables[k].Name
@ -200,7 +221,7 @@ func (schema *Schema) Sort() {
}) })
} }
// Sort sorts columns, primary keys and unique // Sort sorts columns, primary keys and unique.
func (table *Table) Sort() { func (table *Table) Sort() {
sort.Slice(table.Columns, func(i, k int) bool { sort.Slice(table.Columns, func(i, k int) bool {
return table.Columns[i].Name < table.Columns[k].Name return table.Columns[i].Name < table.Columns[k].Name

View File

@ -7,6 +7,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"regexp" "regexp"
"sort"
"strings" "strings"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -15,8 +16,9 @@ import (
) )
type definition struct { type definition struct {
name string name string
sql string table string
sql string
} }
// QuerySchema loads the schema from sqlite database. // QuerySchema loads the schema from sqlite database.
@ -29,7 +31,7 @@ func QuerySchema(ctx context.Context, db dbschema.Queryer) (*dbschema.Schema, er
// find tables and indexes // find tables and indexes
err := func() error { err := func() error {
rows, err := db.QueryContext(ctx, ` rows, err := db.QueryContext(ctx, `
SELECT name, type, sql FROM sqlite_master WHERE sql NOT NULL AND name NOT LIKE 'sqlite_%' SELECT name, tbl_name, type, sql FROM sqlite_master WHERE sql NOT NULL AND name NOT LIKE 'sqlite_%'
`) `)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
@ -37,15 +39,15 @@ func QuerySchema(ctx context.Context, db dbschema.Queryer) (*dbschema.Schema, er
defer func() { err = errs.Combine(err, rows.Close()) }() defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() { for rows.Next() {
var defName, defType, defSQL string var defName, defTblName, defType, defSQL string
err := rows.Scan(&defName, &defType, &defSQL) err := rows.Scan(&defName, &defTblName, &defType, &defSQL)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if defType == "table" { if defType == "table" {
tableDefinitions = append(tableDefinitions, &definition{name: defName, sql: defSQL}) tableDefinitions = append(tableDefinitions, &definition{name: defName, sql: defSQL})
} else if defType == "index" { } else if defType == "index" {
indexDefinitions = append(indexDefinitions, &definition{name: defName, sql: defSQL}) indexDefinitions = append(indexDefinitions, &definition{name: defName, sql: defSQL, table: defTblName})
} }
} }
@ -153,8 +155,10 @@ func discoverIndexes(ctx context.Context, db dbschema.Queryer, schema *dbschema.
// TODO improve indexes discovery // TODO improve indexes discovery
for _, definition := range indexDefinitions { for _, definition := range indexDefinitions {
index := &dbschema.Index{ index := &dbschema.Index{
Name: definition.name, Name: definition.name,
Table: definition.table,
} }
schema.Indexes = append(schema.Indexes, index) schema.Indexes = append(schema.Indexes, index)
indexRows, err := db.QueryContext(ctx, `PRAGMA index_info(`+definition.name+`)`) indexRows, err := db.QueryContext(ctx, `PRAGMA index_info(`+definition.name+`)`)
@ -163,24 +167,61 @@ func discoverIndexes(ctx context.Context, db dbschema.Queryer, schema *dbschema.
} }
defer func() { err = errs.Combine(err, indexRows.Close()) }() defer func() { err = errs.Combine(err, indexRows.Close()) }()
type indexInfo struct {
name *string
seqno int
cid int
}
var indexInfos []indexInfo
for indexRows.Next() { for indexRows.Next() {
var name *string var info indexInfo
var seqno, cid int err := indexRows.Scan(&info.seqno, &info.cid, &info.name)
err := indexRows.Scan(&seqno, &cid, &name)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if name != nil { indexInfos = append(indexInfos, info)
index.Columns = append(index.Columns, *name) }
} else {
matches := rxIndexExpr.FindStringSubmatch(definition.sql) sort.SliceStable(indexInfos, func(i, j int) bool {
index.Columns = append(index.Columns, matches[1]) return indexInfos[i].seqno < indexInfos[j].seqno
})
sqlDef := definition.sql
var parsedColumns []string
parseColumns := func() []string {
if parsedColumns != nil {
return parsedColumns
}
var base string
if matches := rxIndexExpr.FindStringSubmatchIndex(strings.ToUpper(sqlDef)); len(matches) > 0 {
base = sqlDef[matches[2]:matches[3]]
}
parsedColumns = strings.Split(base, ",")
return parsedColumns
}
for _, info := range indexInfos {
if info.name != nil {
index.Columns = append(index.Columns, *info.name)
continue
}
if info.cid == -1 {
index.Columns = append(index.Columns, "rowid")
} else if info.cid == -2 {
index.Columns = append(index.Columns, parseColumns()[info.seqno])
} }
} }
matches := rxIndexTable.FindStringSubmatch(definition.sql) // unique
index.Table = strings.TrimSpace(matches[1]) if strings.Contains(definition.sql, "CREATE UNIQUE INDEX") {
index.Unique = true
}
// partial
if matches := rxIndexPartial.FindStringSubmatch(definition.sql); len(matches) > 0 { if matches := rxIndexPartial.FindStringSubmatch(definition.sql); len(matches) > 0 {
index.Partial = strings.TrimSpace(matches[1]) index.Partial = strings.TrimSpace(matches[1])
} }
@ -192,9 +233,6 @@ var (
// matches UNIQUE (a,b) // matches UNIQUE (a,b)
rxUnique = regexp.MustCompile(`UNIQUE\s*\((.*?)\)`) rxUnique = regexp.MustCompile(`UNIQUE\s*\((.*?)\)`)
// matches ON (a,b)
rxIndexTable = regexp.MustCompile(`ON\s*([^(]*)\(`)
// matches ON table(expr) // matches ON table(expr)
rxIndexExpr = regexp.MustCompile(`ON\s*[^(]*\((.*)\)`) rxIndexExpr = regexp.MustCompile(`ON\s*[^(]*\((.*)\)`)

View File

@ -7,6 +7,7 @@ package storagenodedb
import ( import (
"context" "context"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -18,6 +19,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/dbschema"
"storj.io/storj/private/dbutil/sqliteutil" "storj.io/storj/private/dbutil/sqliteutil"
"storj.io/storj/private/migrate" "storj.io/storj/private/migrate"
"storj.io/storj/private/tagsql" "storj.io/storj/private/tagsql"
@ -316,8 +318,30 @@ func (db *DB) Preflight(ctx context.Context) (err error) {
schema.Indexes = nil schema.Indexes = nil
} }
// get expected schema and expect it to match actual schema // get expected schema
expectedSchema := Schema()[dbName] expectedSchema := Schema()[dbName]
// find extra indexes
var extraIdxs []*dbschema.Index
for _, idx := range schema.Indexes {
if _, exists := expectedSchema.FindIndex(idx.Name); exists {
continue
}
extraIdxs = append(extraIdxs, idx)
}
// drop index from schema if it is not unique to not fail preflight
for _, idx := range extraIdxs {
if !idx.Unique {
schema.DropIndex(idx.Name)
}
}
// warn that schema contains unexpected indexes
if len(extraIdxs) > 0 {
db.log.Warn(fmt.Sprintf("%s: schema contains unexpected indices %v", dbName, extraIdxs))
}
// expect expected schema to match actual schema
if diff := cmp.Diff(expectedSchema, schema); diff != "" { if diff := cmp.Diff(expectedSchema, schema); diff != "" {
return ErrPreflight.New("%s: expected schema does not match actual: %s", dbName, diff) return ErrPreflight.New("%s: expected schema does not match actual: %s", dbName, diff)
} }

View File

@ -310,7 +310,7 @@ func Schema() map[string]*dbschema.Schema {
}, },
Indexes: []*dbschema.Index{ Indexes: []*dbschema.Index{
&dbschema.Index{Name: "idx_order_archived_at", Table: "order_archive_", Columns: []string{"archived_at"}, Unique: false, Partial: ""}, &dbschema.Index{Name: "idx_order_archived_at", Table: "order_archive_", Columns: []string{"archived_at"}, Unique: false, Partial: ""},
&dbschema.Index{Name: "idx_orders", Table: "unsent_order", Columns: []string{"satellite_id", "serial_number"}, Unique: false, Partial: ""}, &dbschema.Index{Name: "idx_orders", Table: "unsent_order", Columns: []string{"satellite_id", "serial_number"}, Unique: true, Partial: ""},
}, },
}, },
"piece_expiration": &dbschema.Schema{ "piece_expiration": &dbschema.Schema{
@ -377,7 +377,7 @@ func Schema() map[string]*dbschema.Schema {
}, },
}, },
Indexes: []*dbschema.Index{ Indexes: []*dbschema.Index{
&dbschema.Index{Name: "idx_piece_space_used_satellite_id", Table: "piece_space_used", Columns: []string{"satellite_id"}, Unique: false, Partial: ""}, &dbschema.Index{Name: "idx_piece_space_used_satellite_id", Table: "piece_space_used", Columns: []string{"satellite_id"}, Unique: true, Partial: ""},
}, },
}, },
"pieceinfo": &dbschema.Schema{ "pieceinfo": &dbschema.Schema{
@ -436,7 +436,7 @@ func Schema() map[string]*dbschema.Schema {
}, },
Indexes: []*dbschema.Index{ Indexes: []*dbschema.Index{
&dbschema.Index{Name: "idx_pieceinfo__expiration", Table: "pieceinfo_", Columns: []string{"piece_expiration"}, Unique: false, Partial: "piece_expiration IS NOT NULL"}, &dbschema.Index{Name: "idx_pieceinfo__expiration", Table: "pieceinfo_", Columns: []string{"piece_expiration"}, Unique: false, Partial: "piece_expiration IS NOT NULL"},
&dbschema.Index{Name: "pk_pieceinfo_", Table: "pieceinfo_", Columns: []string{"satellite_id", "piece_id"}, Unique: false, Partial: ""}, &dbschema.Index{Name: "pk_pieceinfo_", Table: "pieceinfo_", Columns: []string{"satellite_id", "piece_id"}, Unique: true, Partial: ""},
}, },
}, },
"pricing": &dbschema.Schema{ "pricing": &dbschema.Schema{