satellite/metainfo/metabase: use pgx.Batch to delete expired segments
Change-Id: I51eeaadeeef0ca2faabac4b38158278aea177eb8
This commit is contained in:
parent
b2be1f1629
commit
6a805b2891
@ -92,6 +92,9 @@ type cockroachConn struct {
|
|||||||
// Assert that cockroachConn fulfills connAll.
|
// Assert that cockroachConn fulfills connAll.
|
||||||
var _ connAll = (*cockroachConn)(nil)
|
var _ connAll = (*cockroachConn)(nil)
|
||||||
|
|
||||||
|
// StdlibConn returns the underlying pgx std connection.
|
||||||
|
func (c *cockroachConn) StdlibConn() *stdlib.Conn { return c.underlying }
|
||||||
|
|
||||||
// Close closes the cockroachConn.
|
// Close closes the cockroachConn.
|
||||||
func (c *cockroachConn) Close() error {
|
func (c *cockroachConn) Close() error {
|
||||||
return c.underlying.Close()
|
return c.underlying.Close()
|
||||||
|
@ -9,11 +9,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
|
"github.com/jackc/pgx/v4/stdlib"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"storj.io/common/uuid"
|
|
||||||
"storj.io/storj/private/dbutil"
|
"storj.io/storj/private/dbutil"
|
||||||
"storj.io/storj/private/dbutil/pgutil"
|
|
||||||
"storj.io/storj/private/tagsql"
|
"storj.io/storj/private/tagsql"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -117,47 +118,60 @@ func (db *DB) deleteExpiredObjects(ctx context.Context, expiredObjects []ObjectS
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
projectIds := make([]uuid.UUID, len(expiredObjects))
|
conn, err := db.db.Conn(ctx)
|
||||||
buckets := make([][]byte, len(expiredObjects))
|
if err != nil {
|
||||||
objectKeys := make([][]byte, len(expiredObjects))
|
return Error.New("unable to get the raw conn: %w", err)
|
||||||
versions := make([]int32, len(expiredObjects))
|
|
||||||
streamIDs := make([]uuid.UUID, len(expiredObjects))
|
|
||||||
|
|
||||||
for i, object := range expiredObjects {
|
|
||||||
projectIds[i] = object.ProjectID
|
|
||||||
buckets[i] = []byte(object.BucketName)
|
|
||||||
objectKeys[i] = []byte(object.ObjectKey)
|
|
||||||
versions[i] = int32(object.Version)
|
|
||||||
streamIDs[i] = object.StreamID
|
|
||||||
}
|
}
|
||||||
query := `
|
defer func() { err = errs.Combine(err, conn.Close()) }()
|
||||||
WITH deleted_objects AS (
|
|
||||||
DELETE FROM objects
|
|
||||||
WHERE
|
|
||||||
(project_id, bucket_name, object_key, version, stream_id) IN (
|
|
||||||
SELECT
|
|
||||||
unnest($1::BYTEA[]),
|
|
||||||
unnest($2::BYTEA[]),
|
|
||||||
unnest($3::BYTEA[]),
|
|
||||||
unnest($4::INT4[]),
|
|
||||||
unnest($5::BYTEA[])
|
|
||||||
)
|
|
||||||
RETURNING 1
|
|
||||||
)
|
|
||||||
DELETE FROM segments
|
|
||||||
WHERE segments.stream_id = ANY($5::BYTEA[])
|
|
||||||
`
|
|
||||||
_, err = db.db.ExecContext(ctx,
|
|
||||||
query,
|
|
||||||
pgutil.UUIDArray(projectIds),
|
|
||||||
pgutil.ByteaArray(buckets),
|
|
||||||
pgutil.ByteaArray(objectKeys),
|
|
||||||
pgutil.Int4Array(versions),
|
|
||||||
pgutil.UUIDArray(streamIDs),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
err = conn.Raw(ctx, func(driverConn interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if closable, ok := driverConn.(interface{ Close() error }); ok {
|
||||||
|
err = errs.Combine(err, closable.Close())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var pgxconn *pgx.Conn
|
||||||
|
switch conn := driverConn.(type) {
|
||||||
|
case interface{ StdlibConn() *stdlib.Conn }:
|
||||||
|
pgxconn = conn.StdlibConn().Conn()
|
||||||
|
case *stdlib.Conn:
|
||||||
|
pgxconn = conn.Conn()
|
||||||
|
default:
|
||||||
|
return Error.New("invalid raw conn driver %T", driverConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
var batch pgx.Batch
|
||||||
|
for _, obj := range expiredObjects {
|
||||||
|
obj := obj
|
||||||
|
|
||||||
|
batch.Queue(`START TRANSACTION`)
|
||||||
|
batch.Queue(`
|
||||||
|
DELETE FROM objects
|
||||||
|
WHERE (project_id, bucket_name, object_key, version) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4)
|
||||||
|
AND stream_id = $5::BYTEA
|
||||||
|
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.Version, obj.StreamID)
|
||||||
|
batch.Queue(`
|
||||||
|
DELETE FROM segments
|
||||||
|
WHERE segments.stream_id = $1::BYTEA
|
||||||
|
`, obj.StreamID)
|
||||||
|
batch.Queue(`COMMIT TRANSACTION`)
|
||||||
|
}
|
||||||
|
|
||||||
|
results := pgxconn.SendBatch(ctx, &batch)
|
||||||
|
defer func() { err = errs.Combine(err, results.Close()) }()
|
||||||
|
|
||||||
|
var errlist errs.Group
|
||||||
|
for i := 0; i < batch.Len(); i++ {
|
||||||
|
_, err := results.Exec()
|
||||||
|
errlist.Add(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return errlist.Err()
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("unable to delete expired objects: %w", err)
|
return Error.New("unable to delete expired objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user