mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 20:24:41 +08:00
Compare commits
2 Commits
docs/add-t
...
kvstore-tx
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e39b24b6f | ||
|
|
c31c1d8e8d |
@@ -35,6 +35,109 @@ type ListOptions struct {
|
|||||||
Limit int64 // maximum number of results to return. 0 means no limit.
|
Limit int64 // maximum number of results to return. 0 means no limit.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompareTarget specifies what to compare in a transaction
|
||||||
|
type CompareTarget int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CompareExists CompareTarget = iota // Check if key exists (Value: bool)
|
||||||
|
CompareValue // Compare actual value (Value: []byte)
|
||||||
|
)
|
||||||
|
|
||||||
|
// CompareResult specifies the comparison operator
|
||||||
|
type CompareResult int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CompareEqual CompareResult = iota
|
||||||
|
CompareNotEqual
|
||||||
|
CompareGreater
|
||||||
|
CompareLess
|
||||||
|
)
|
||||||
|
|
||||||
|
// Compare represents a single comparison in a transaction.
|
||||||
|
// Use the constructor functions CompareKeyExists, CompareKeyNotExists, and CompareKeyValue to create comparisons.
|
||||||
|
type Compare struct {
|
||||||
|
Key string
|
||||||
|
Target CompareTarget
|
||||||
|
Result CompareResult // Only used for CompareValue
|
||||||
|
Exists bool // Used when Target == CompareExists
|
||||||
|
Value []byte // Used when Target == CompareValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompareKeyExists creates a comparison that succeeds if the key exists.
|
||||||
|
func CompareKeyExists(key string) Compare {
|
||||||
|
return Compare{Key: key, Target: CompareExists, Exists: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompareKeyNotExists creates a comparison that succeeds if the key does not exist.
|
||||||
|
func CompareKeyNotExists(key string) Compare {
|
||||||
|
return Compare{Key: key, Target: CompareExists, Exists: false}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompareKeyValue creates a comparison that compares the value of a key.
|
||||||
|
// The comparison succeeds if the stored value matches the expected value according to the result operator.
|
||||||
|
func CompareKeyValue(key string, result CompareResult, value []byte) Compare {
|
||||||
|
return Compare{Key: key, Target: CompareValue, Result: result, Value: value}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnOpType specifies the type of operation in a transaction
|
||||||
|
type TxnOpType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
TxnOpPut TxnOpType = iota
|
||||||
|
TxnOpDelete
|
||||||
|
)
|
||||||
|
|
||||||
|
// TxnOp represents an operation in a transaction.
|
||||||
|
// Use the constructor functions TxnPut and TxnDelete to create operations.
|
||||||
|
type TxnOp struct {
|
||||||
|
Type TxnOpType
|
||||||
|
Key string
|
||||||
|
Value []byte // For Put operations
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnPut creates a Put operation that stores a value at the given key.
|
||||||
|
func TxnPut(key string, value []byte) TxnOp {
|
||||||
|
return TxnOp{Type: TxnOpPut, Key: key, Value: value}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnDelete creates a Delete operation that removes the given key.
|
||||||
|
func TxnDelete(key string) TxnOp {
|
||||||
|
return TxnOp{Type: TxnOpDelete, Key: key}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnResponse contains the result of a transaction
|
||||||
|
type TxnResponse struct {
|
||||||
|
// Succeeded indicates whether the comparisons passed (true) or failed (false)
|
||||||
|
Succeeded bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maximum limits for transaction operations
|
||||||
|
const (
|
||||||
|
MaxTxnCompares = 8
|
||||||
|
MaxTxnOps = 8
|
||||||
|
)
|
||||||
|
|
||||||
|
// ValidateTxnRequest validates the transaction request parameters.
|
||||||
|
func ValidateTxnRequest(section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) error {
|
||||||
|
if section == "" {
|
||||||
|
return fmt.Errorf("section is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cmps) > MaxTxnCompares {
|
||||||
|
return fmt.Errorf("too many comparisons: %d > %d", len(cmps), MaxTxnCompares)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(successOps) > MaxTxnOps {
|
||||||
|
return fmt.Errorf("too many success operations: %d > %d", len(successOps), MaxTxnOps)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(failureOps) > MaxTxnOps {
|
||||||
|
return fmt.Errorf("too many failure operations: %d > %d", len(failureOps), MaxTxnOps)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type KV interface {
|
type KV interface {
|
||||||
// Keys returns all the keys in the store
|
// Keys returns all the keys in the store
|
||||||
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
|
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
|
||||||
@@ -60,6 +163,11 @@ type KV interface {
|
|||||||
// UnixTimestamp returns the current time in seconds since Epoch.
|
// UnixTimestamp returns the current time in seconds since Epoch.
|
||||||
// This is used to ensure the server and client are not too far apart in time.
|
// This is used to ensure the server and client are not too far apart in time.
|
||||||
UnixTimestamp(ctx context.Context) (int64, error)
|
UnixTimestamp(ctx context.Context) (int64, error)
|
||||||
|
|
||||||
|
// Txn executes a transaction with compare-and-swap semantics.
|
||||||
|
// If all comparisons succeed, successOps are executed; otherwise failureOps are executed.
|
||||||
|
// Limited to MaxTxnCompares comparisons and MaxTxnOps operations each for success/failure.
|
||||||
|
Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ KV = &badgerKV{}
|
var _ KV = &badgerKV{}
|
||||||
@@ -360,3 +468,102 @@ func (k *badgerKV) BatchDelete(ctx context.Context, section string, keys []strin
|
|||||||
|
|
||||||
return txn.Commit()
|
return txn.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *badgerKV) Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error) {
|
||||||
|
if k.db.IsClosed() {
|
||||||
|
return nil, fmt.Errorf("database is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ValidateTxnRequest(section, cmps, successOps, failureOps); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
txn := k.db.NewTransaction(true)
|
||||||
|
defer txn.Discard()
|
||||||
|
|
||||||
|
// Evaluate all comparisons
|
||||||
|
succeeded := true
|
||||||
|
for _, cmp := range cmps {
|
||||||
|
keyWithSection := section + "/" + cmp.Key
|
||||||
|
item, err := txn.Get([]byte(keyWithSection))
|
||||||
|
keyExists := err == nil
|
||||||
|
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cmp.Target {
|
||||||
|
case CompareExists:
|
||||||
|
if keyExists != cmp.Exists {
|
||||||
|
succeeded = false
|
||||||
|
}
|
||||||
|
|
||||||
|
case CompareValue:
|
||||||
|
if !keyExists {
|
||||||
|
// Key doesn't exist, comparison fails unless comparing for not-equal
|
||||||
|
if cmp.Result != CompareNotEqual {
|
||||||
|
succeeded = false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
itemValue, err := item.ValueCopy(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !compareBytes(itemValue, cmp.Value, cmp.Result) {
|
||||||
|
succeeded = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown compare target: %d", cmp.Target)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !succeeded {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the appropriate operations
|
||||||
|
ops := successOps
|
||||||
|
if !succeeded {
|
||||||
|
ops = failureOps
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, op := range ops {
|
||||||
|
keyWithSection := section + "/" + op.Key
|
||||||
|
switch op.Type {
|
||||||
|
case TxnOpPut:
|
||||||
|
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
case TxnOpDelete:
|
||||||
|
if err := txn.Delete([]byte(keyWithSection)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown operation type: %d", op.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := txn.Commit(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TxnResponse{Succeeded: succeeded}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// compareBytes compares two byte slices based on the comparison result type
|
||||||
|
func compareBytes(a, b []byte, result CompareResult) bool {
|
||||||
|
cmp := bytes.Compare(a, b)
|
||||||
|
switch result {
|
||||||
|
case CompareEqual:
|
||||||
|
return cmp == 0
|
||||||
|
case CompareNotEqual:
|
||||||
|
return cmp != 0
|
||||||
|
case CompareGreater:
|
||||||
|
return cmp > 0
|
||||||
|
case CompareLess:
|
||||||
|
return cmp < 0
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ const (
|
|||||||
TestKVUnixTimestamp = "unix timestamp"
|
TestKVUnixTimestamp = "unix timestamp"
|
||||||
TestKVBatchGet = "batch get operations"
|
TestKVBatchGet = "batch get operations"
|
||||||
TestKVBatchDelete = "batch delete operations"
|
TestKVBatchDelete = "batch delete operations"
|
||||||
|
TestKVTxn = "transaction operations"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewKVFunc is a function that creates a new KV instance for testing
|
// NewKVFunc is a function that creates a new KV instance for testing
|
||||||
@@ -69,6 +70,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
|
|||||||
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
|
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
|
||||||
{TestKVBatchGet, runTestKVBatchGet},
|
{TestKVBatchGet, runTestKVBatchGet},
|
||||||
{TestKVBatchDelete, runTestKVBatchDelete},
|
{TestKVBatchDelete, runTestKVBatchDelete},
|
||||||
|
{TestKVTxn, runTestKVTxn},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
@@ -801,3 +803,371 @@ func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, ke
|
|||||||
err = writer.Close()
|
err = writer.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runTestKVTxn(t *testing.T, kv resource.KV, nsPrefix string) {
|
||||||
|
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
|
||||||
|
section := nsPrefix + "-txn"
|
||||||
|
|
||||||
|
t.Run("txn with empty section", func(t *testing.T) {
|
||||||
|
_, err := kv.Txn(ctx, "", nil, nil, nil)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "section is required")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn with no comparisons executes success ops", func(t *testing.T) {
|
||||||
|
// With no comparisons, all comparisons "pass" so success ops should run
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "no-cmp-key", Value: []byte("success-value")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, nil, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the key was created
|
||||||
|
reader, err := kv.Get(ctx, section, "no-cmp-key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "success-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare exists false for non-existent key", func(t *testing.T) {
|
||||||
|
// Key doesn't exist, so exists should be false
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyNotExists("non-existent-key"),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "created-key", Value: []byte("created-value")},
|
||||||
|
}
|
||||||
|
failureOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "non-existent-failure-marker", Value: []byte("failure-executed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the key was created
|
||||||
|
reader, err := kv.Get(ctx, section, "created-key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "created-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify failure op was not executed
|
||||||
|
_, err = kv.Get(ctx, section, "non-existent-failure-marker")
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare exists true for existing key", func(t *testing.T) {
|
||||||
|
// First create a key
|
||||||
|
saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader("existing-value"))
|
||||||
|
|
||||||
|
// Key exists, so exists should be true
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyExists("existing-key"),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "existing-key", Value: []byte("updated-value")},
|
||||||
|
}
|
||||||
|
failureOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "existing-failure-marker", Value: []byte("failure-executed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the key was updated
|
||||||
|
reader, err := kv.Get(ctx, section, "existing-key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "updated-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify failure op was not executed
|
||||||
|
_, err = kv.Get(ctx, section, "existing-failure-marker")
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare exists fails when key exists but expected not to", func(t *testing.T) {
|
||||||
|
// First create a key
|
||||||
|
saveKVHelper(t, kv, ctx, section, "exists-fail-key", strings.NewReader("some-value"))
|
||||||
|
|
||||||
|
// Key exists but we expect it not to
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyNotExists("exists-fail-key"),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "exists-fail-result", Value: []byte("should-not-exist")},
|
||||||
|
}
|
||||||
|
failureOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "exists-fail-marker", Value: []byte("failure-executed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify success op was not executed
|
||||||
|
_, err = kv.Get(ctx, section, "exists-fail-result")
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, resource.ErrNotFound, err)
|
||||||
|
|
||||||
|
// Verify failure ops were executed
|
||||||
|
reader, err := kv.Get(ctx, section, "exists-fail-marker")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "failure-executed", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare value equals", func(t *testing.T) {
|
||||||
|
// First create a key with known value
|
||||||
|
saveKVHelper(t, kv, ctx, section, "value-cmp-key", strings.NewReader("expected-value"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("value-cmp-key", resource.CompareEqual, []byte("expected-value")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "value-cmp-key", Value: []byte("new-value")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the key was updated
|
||||||
|
reader, err := kv.Get(ctx, section, "value-cmp-key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "new-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare value fails executes failure ops", func(t *testing.T) {
|
||||||
|
// First create a key with known value
|
||||||
|
saveKVHelper(t, kv, ctx, section, "value-fail-key", strings.NewReader("actual-value"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("value-fail-key", resource.CompareEqual, []byte("wrong-value")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "value-fail-key", Value: []byte("should-not-be-set")},
|
||||||
|
}
|
||||||
|
failureOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "failure-marker", Value: []byte("failure-executed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the original key was not changed
|
||||||
|
reader, err := kv.Get(ctx, section, "value-fail-key")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "actual-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify failure ops were executed
|
||||||
|
reader, err = kv.Get(ctx, section, "failure-marker")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err = io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "failure-executed", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn delete operation", func(t *testing.T) {
|
||||||
|
// First create a key
|
||||||
|
saveKVHelper(t, kv, ctx, section, "delete-txn-key", strings.NewReader("to-be-deleted"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyExists("delete-txn-key"),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpDelete, Key: "delete-txn-key"},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the key was deleted
|
||||||
|
_, err = kv.Get(ctx, section, "delete-txn-key")
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, resource.ErrNotFound, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn multiple comparisons all must pass", func(t *testing.T) {
|
||||||
|
// Create two keys
|
||||||
|
saveKVHelper(t, kv, ctx, section, "multi-cmp-key1", strings.NewReader("value1"))
|
||||||
|
saveKVHelper(t, kv, ctx, section, "multi-cmp-key2", strings.NewReader("value2"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("multi-cmp-key1", resource.CompareEqual, []byte("value1")),
|
||||||
|
resource.CompareKeyValue("multi-cmp-key2", resource.CompareEqual, []byte("value2")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "multi-success", Value: []byte("both-passed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify success op was executed
|
||||||
|
reader, err := kv.Get(ctx, section, "multi-success")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "both-passed", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn multiple comparisons one fails", func(t *testing.T) {
|
||||||
|
// Create two keys
|
||||||
|
saveKVHelper(t, kv, ctx, section, "multi-fail-key1", strings.NewReader("value1"))
|
||||||
|
saveKVHelper(t, kv, ctx, section, "multi-fail-key2", strings.NewReader("value2"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("multi-fail-key1", resource.CompareEqual, []byte("value1")),
|
||||||
|
resource.CompareKeyValue("multi-fail-key2", resource.CompareEqual, []byte("wrong-value")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "multi-fail-success", Value: []byte("should-not-exist")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify success op was not executed
|
||||||
|
_, err = kv.Get(ctx, section, "multi-fail-success")
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, resource.ErrNotFound, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn too many comparisons", func(t *testing.T) {
|
||||||
|
cmps := make([]resource.Compare, resource.MaxTxnCompares+1)
|
||||||
|
for i := range cmps {
|
||||||
|
cmps[i] = resource.CompareKeyNotExists(fmt.Sprintf("key-%d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := kv.Txn(ctx, section, cmps, nil, nil)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "too many comparisons")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn too many success operations", func(t *testing.T) {
|
||||||
|
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
|
||||||
|
for i := range ops {
|
||||||
|
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := kv.Txn(ctx, section, nil, ops, nil)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "too many success operations")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn too many failure operations", func(t *testing.T) {
|
||||||
|
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
|
||||||
|
for i := range ops {
|
||||||
|
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := kv.Txn(ctx, section, nil, nil, ops)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "too many failure operations")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare greater than", func(t *testing.T) {
|
||||||
|
saveKVHelper(t, kv, ctx, section, "greater-key", strings.NewReader("bbb"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("greater-key", resource.CompareGreater, []byte("aaa")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "greater-result", Value: []byte("passed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare less than", func(t *testing.T) {
|
||||||
|
saveKVHelper(t, kv, ctx, section, "less-key", strings.NewReader("aaa"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("less-key", resource.CompareLess, []byte("bbb")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
{Type: resource.TxnOpPut, Key: "less-result", Value: []byte("passed")},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn compare not equal for non-existent value key", func(t *testing.T) {
|
||||||
|
// Key doesn't exist, comparing value should fail for equal but pass for not-equal
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyValue("non-existent-value-key", resource.CompareNotEqual, []byte("any-value")),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
resource.TxnPut("not-equal-result", []byte("passed")),
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("txn using constructor functions", func(t *testing.T) {
|
||||||
|
// Test that constructor functions work correctly
|
||||||
|
saveKVHelper(t, kv, ctx, section, "constructor-key", strings.NewReader("constructor-value"))
|
||||||
|
|
||||||
|
cmps := []resource.Compare{
|
||||||
|
resource.CompareKeyExists("constructor-key"),
|
||||||
|
}
|
||||||
|
successOps := []resource.TxnOp{
|
||||||
|
resource.TxnPut("constructor-new", []byte("new-value")),
|
||||||
|
resource.TxnDelete("constructor-key"),
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, resp.Succeeded)
|
||||||
|
|
||||||
|
// Verify the operations actually happened
|
||||||
|
_, err = kv.Get(ctx, section, "constructor-key")
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Equal(t, resource.ErrNotFound, err)
|
||||||
|
|
||||||
|
reader, err := kv.Get(ctx, section, "constructor-new")
|
||||||
|
require.NoError(t, err)
|
||||||
|
value, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "new-value", string(value))
|
||||||
|
err = reader.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user