satellite/metabase: FinishMoveObject segment query improved
Change-Id: I198033d5763e6f386d3090fed49edcbfd95ff536
This commit is contained in:
parent
71eb184ef3
commit
cae08d816c
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/dbutil/txutil"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
@ -171,29 +172,38 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
|
||||
return ErrInvalidRequest.New("wrong amount of segments keys received")
|
||||
}
|
||||
|
||||
updateSegmentsQuery := `
|
||||
UPDATE segments SET
|
||||
encrypted_key_nonce = $1,
|
||||
encrypted_key = $2
|
||||
WHERE
|
||||
stream_id = $3 AND
|
||||
position = $4
|
||||
`
|
||||
var newSegmentKeys struct {
|
||||
Positions []int64
|
||||
EncryptedKeys [][]byte
|
||||
EncryptedKeyNonces [][]byte
|
||||
}
|
||||
|
||||
for _, keyAndNonce := range opts.NewSegmentKeys {
|
||||
updateResult, err := db.db.ExecContext(ctx, updateSegmentsQuery, keyAndNonce.EncryptedKeyNonce, keyAndNonce.EncryptedKey, opts.StreamID, keyAndNonce.Position)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
for _, u := range opts.NewSegmentKeys {
|
||||
newSegmentKeys.EncryptedKeys = append(newSegmentKeys.EncryptedKeys, u.EncryptedKey)
|
||||
newSegmentKeys.EncryptedKeyNonces = append(newSegmentKeys.EncryptedKeyNonces, u.EncryptedKeyNonce)
|
||||
newSegmentKeys.Positions = append(newSegmentKeys.Positions, int64(u.Position.Encode()))
|
||||
}
|
||||
|
||||
affected, err := updateResult.RowsAffected()
|
||||
if err != nil {
|
||||
return Error.New("failed to get rows affected: %w", err)
|
||||
}
|
||||
updateResult, err := db.db.ExecContext(ctx, `
|
||||
UPDATE segments SET
|
||||
encrypted_key_nonce = P.encrypted_key_nonce,
|
||||
encrypted_key = P.encrypted_key
|
||||
FROM (SELECT unnest($2::INT8[]), unnest($3::BYTEA[]), unnest($4::BYTEA[])) as P(position, encrypted_key_nonce, encrypted_key)
|
||||
WHERE
|
||||
stream_id = $1 AND
|
||||
segments.position = P.position
|
||||
`, opts.StreamID, pgutil.Int8Array(newSegmentKeys.Positions), pgutil.ByteaArray(newSegmentKeys.EncryptedKeyNonces), pgutil.ByteaArray(newSegmentKeys.EncryptedKeys))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if affected != 1 {
|
||||
return Error.New("segment is missing")
|
||||
}
|
||||
affected, err := updateResult.RowsAffected()
|
||||
if err != nil {
|
||||
return Error.New("failed to get rows affected: %w", err)
|
||||
}
|
||||
|
||||
if affected != int64(len(newSegmentKeys.Positions)) {
|
||||
return Error.New("segment is missing")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user