Compare commits

...

5 Commits

Author SHA1 Message Date
Georges Chaudy
b1014dbbde add nested txn ? 2025-12-08 09:25:14 +01:00
Georges Chaudy
8e5db10b08 kvstore: add a test for concurrency 2025-12-05 09:54:33 +01:00
Georges Chaudy
01ee5d7be4 kvstore: add check for size 2025-12-05 09:13:55 +01:00
Georges Chaudy
6e39b24b6f Enhance comparison functions in KV storage
- Updated the CompareKeyExists function to simplify its usage by removing the 'exists' parameter, making it always succeed if the key exists.
- Introduced a new CompareKeyNotExists function to check for non-existent keys.
- Modified transaction tests to utilize the new comparison functions, ensuring accurate behavior for both existing and non-existing keys.
2025-12-04 16:17:43 +01:00
Georges Chaudy
c31c1d8e8d Add transaction support to KV storage
- Introduced transaction operations with compare-and-swap semantics.
- Added types and functions for comparisons (e.g., CompareExists, CompareValue) and transaction operations (e.g., TxnOpPut, TxnOpDelete).
- Implemented Txn method to execute transactions based on comparisons and success/failure operations.
- Added validation for transaction requests to enforce limits on comparisons and operations.
- Enhanced testing suite to cover various transaction scenarios, including success and failure cases, and edge conditions.

This update enhances the KV interface, allowing for more complex data manipulation while ensuring data integrity through transactional guarantees.
2025-12-03 11:33:22 +01:00
2 changed files with 904 additions and 0 deletions

View File

@@ -35,6 +35,193 @@ type ListOptions struct {
Limit int64 // maximum number of results to return. 0 means no limit.
}
// CompareTarget specifies what to compare in a transaction
type CompareTarget int
const (
CompareExists CompareTarget = iota // Check if key exists (Value: bool)
CompareValue // Compare actual value (Value: []byte)
)
// CompareResult specifies the comparison operator
type CompareResult int
const (
CompareEqual CompareResult = iota
CompareNotEqual
CompareGreater
CompareLess
)
// Compare represents a single comparison in a transaction.
// Use the constructor functions CompareKeyExists, CompareKeyNotExists, and CompareKeyValue to create comparisons.
type Compare struct {
Key string
Target CompareTarget
Result CompareResult // Only used for CompareValue
Exists bool // Used when Target == CompareExists
Value []byte // Used when Target == CompareValue
}
// CompareKeyExists creates a comparison that succeeds if the key exists.
func CompareKeyExists(key string) Compare {
return Compare{Key: key, Target: CompareExists, Exists: true}
}
// CompareKeyNotExists creates a comparison that succeeds if the key does not exist.
func CompareKeyNotExists(key string) Compare {
return Compare{Key: key, Target: CompareExists, Exists: false}
}
// CompareKeyValue creates a comparison that compares the value of a key.
// The comparison succeeds if the stored value matches the expected value according to the result operator.
func CompareKeyValue(key string, result CompareResult, value []byte) Compare {
return Compare{Key: key, Target: CompareValue, Result: result, Value: value}
}
// TxnOpType specifies the type of operation in a transaction
type TxnOpType int
const (
TxnOpPut TxnOpType = iota
TxnOpDelete
TxnOpTxn // Nested transaction
)
// TxnOp represents an operation in a transaction.
// Use the constructor functions TxnPut, TxnDelete, and TxnNested to create operations.
type TxnOp struct {
Type TxnOpType
Key string
Value []byte // For Put operations
// For nested transactions (Type == TxnOpTxn)
Compares []Compare
SuccessOps []TxnOp
FailureOps []TxnOp
}
// TxnPut creates a Put operation that stores a value at the given key.
func TxnPut(key string, value []byte) TxnOp {
return TxnOp{Type: TxnOpPut, Key: key, Value: value}
}
// TxnDelete creates a Delete operation that removes the given key.
func TxnDelete(key string) TxnOp {
return TxnOp{Type: TxnOpDelete, Key: key}
}
// TxnNested creates a nested transaction operation.
func TxnNested(compares []Compare, successOps []TxnOp, failureOps []TxnOp) TxnOp {
return TxnOp{
Type: TxnOpTxn,
Compares: compares,
SuccessOps: successOps,
FailureOps: failureOps,
}
}
// TxnResponse contains the result of a transaction
type TxnResponse struct {
// Succeeded indicates whether the comparisons passed (true) or failed (false)
Succeeded bool
}
// Maximum limits for transaction operations
const (
MaxTxnCompares = 8
MaxTxnOps = 8
MaxTxnTotalSize = 1 * 1024 * 1024 // 1MB total payload
MaxTxnDepth = 3 // Maximum nesting depth for transactions
)
// ValidateTxnRequest validates the transaction request parameters.
func ValidateTxnRequest(section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) error {
// First validate structure
if err := validateTxnRequestWithDepth(section, cmps, successOps, failureOps, 1); err != nil {
return err
}
// Then check total payload size
totalSize := calculateTxnTotalSize(cmps, successOps, failureOps)
if totalSize > MaxTxnTotalSize {
return fmt.Errorf("total transaction payload too large: %d > %d", totalSize, MaxTxnTotalSize)
}
return nil
}
// calculateTxnTotalSize calculates the total size of all values in the transaction.
func calculateTxnTotalSize(cmps []Compare, successOps []TxnOp, failureOps []TxnOp) int {
total := 0
for _, cmp := range cmps {
total += len(cmp.Value)
}
total += calculateOpsTotalSize(successOps)
total += calculateOpsTotalSize(failureOps)
return total
}
// calculateOpsTotalSize calculates the total size of values in operations, including nested transactions.
func calculateOpsTotalSize(ops []TxnOp) int {
total := 0
for _, op := range ops {
total += len(op.Value)
if op.Type == TxnOpTxn {
total += calculateTxnTotalSize(op.Compares, op.SuccessOps, op.FailureOps)
}
}
return total
}
// validateTxnRequestWithDepth validates the transaction request with depth tracking.
func validateTxnRequestWithDepth(section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp, depth int) error {
if depth > MaxTxnDepth {
return fmt.Errorf("transaction nesting too deep: %d > %d", depth, MaxTxnDepth)
}
if section == "" {
return fmt.Errorf("section is required")
}
if len(cmps) > MaxTxnCompares {
return fmt.Errorf("too many comparisons: %d > %d", len(cmps), MaxTxnCompares)
}
if len(successOps) > MaxTxnOps {
return fmt.Errorf("too many success operations: %d > %d", len(successOps), MaxTxnOps)
}
if len(failureOps) > MaxTxnOps {
return fmt.Errorf("too many failure operations: %d > %d", len(failureOps), MaxTxnOps)
}
if err := validateOpsWithDepth(successOps, "success", section, depth); err != nil {
return err
}
if err := validateOpsWithDepth(failureOps, "failure", section, depth); err != nil {
return err
}
return nil
}
// validateOpsWithDepth validates operations including nested transactions.
func validateOpsWithDepth(ops []TxnOp, opType string, section string, depth int) error {
for i, op := range ops {
if op.Type == TxnOpTxn {
if err := validateTxnRequestWithDepth(section, op.Compares, op.SuccessOps, op.FailureOps, depth+1); err != nil {
return fmt.Errorf("%s operation %d nested txn: %w", opType, i, err)
}
}
}
return nil
}
type KV interface {
// Keys returns all the keys in the store
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
@@ -60,6 +247,11 @@ type KV interface {
// UnixTimestamp returns the current time in seconds since Epoch.
// This is used to ensure the server and client are not too far apart in time.
UnixTimestamp(ctx context.Context) (int64, error)
// Txn executes a transaction with compare-and-swap semantics.
// If all comparisons succeed, successOps are executed; otherwise failureOps are executed.
// Limited to MaxTxnCompares comparisons and MaxTxnOps operations each for success/failure.
Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error)
}
var _ KV = &badgerKV{}
@@ -360,3 +552,122 @@ func (k *badgerKV) BatchDelete(ctx context.Context, section string, keys []strin
return txn.Commit()
}
func (k *badgerKV) Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error) {
if k.db.IsClosed() {
return nil, fmt.Errorf("database is closed")
}
if err := ValidateTxnRequest(section, cmps, successOps, failureOps); err != nil {
return nil, err
}
txn := k.db.NewTransaction(true)
defer txn.Discard()
// Execute the transaction (potentially with nested transactions)
succeeded, err := k.executeTxnOps(txn, section, cmps, successOps, failureOps)
if err != nil {
return nil, err
}
if err := txn.Commit(); err != nil {
return nil, err
}
return &TxnResponse{Succeeded: succeeded}, nil
}
// executeTxnOps evaluates comparisons and executes operations, supporting nested transactions.
// Returns whether the comparisons succeeded and any error encountered.
func (k *badgerKV) executeTxnOps(txn *badger.Txn, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (bool, error) {
// Evaluate all comparisons
succeeded := true
for _, cmp := range cmps {
keyWithSection := section + "/" + cmp.Key
item, err := txn.Get([]byte(keyWithSection))
keyExists := err == nil
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return false, err
}
switch cmp.Target {
case CompareExists:
if keyExists != cmp.Exists {
succeeded = false
}
case CompareValue:
if !keyExists {
// Key doesn't exist, comparison fails unless comparing for not-equal
if cmp.Result != CompareNotEqual {
succeeded = false
}
} else {
itemValue, err := item.ValueCopy(nil)
if err != nil {
return false, err
}
if !compareBytes(itemValue, cmp.Value, cmp.Result) {
succeeded = false
}
}
default:
return false, fmt.Errorf("unknown compare target: %d", cmp.Target)
}
if !succeeded {
break
}
}
// Execute the appropriate operations
ops := successOps
if !succeeded {
ops = failureOps
}
for _, op := range ops {
switch op.Type {
case TxnOpPut:
keyWithSection := section + "/" + op.Key
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
return false, err
}
case TxnOpDelete:
keyWithSection := section + "/" + op.Key
if err := txn.Delete([]byte(keyWithSection)); err != nil {
return false, err
}
case TxnOpTxn:
// Execute nested transaction
_, err := k.executeTxnOps(txn, section, op.Compares, op.SuccessOps, op.FailureOps)
if err != nil {
return false, err
}
// Note: nested transaction's succeeded status doesn't affect parent's succeeded status
default:
return false, fmt.Errorf("unknown operation type: %d", op.Type)
}
}
return succeeded, nil
}
// compareBytes compares two byte slices based on the comparison result type
func compareBytes(a, b []byte, result CompareResult) bool {
cmp := bytes.Compare(a, b)
switch result {
case CompareEqual:
return cmp == 0
case CompareNotEqual:
return cmp != 0
case CompareGreater:
return cmp > 0
case CompareLess:
return cmp < 0
default:
return false
}
}

View File

@@ -28,6 +28,7 @@ const (
TestKVUnixTimestamp = "unix timestamp"
TestKVBatchGet = "batch get operations"
TestKVBatchDelete = "batch delete operations"
TestKVTxn = "transaction operations"
)
// NewKVFunc is a function that creates a new KV instance for testing
@@ -69,6 +70,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
{TestKVBatchGet, runTestKVBatchGet},
{TestKVBatchDelete, runTestKVBatchDelete},
{TestKVTxn, runTestKVTxn},
}
for _, tc := range cases {
@@ -801,3 +803,594 @@ func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, ke
err = writer.Close()
require.NoError(t, err)
}
func runTestKVTxn(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-txn"
t.Run("txn with empty section", func(t *testing.T) {
_, err := kv.Txn(ctx, "", nil, nil, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
t.Run("txn with no comparisons executes success ops", func(t *testing.T) {
// With no comparisons, all comparisons "pass" so success ops should run
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "no-cmp-key", Value: []byte("success-value")},
}
resp, err := kv.Txn(ctx, section, nil, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was created
reader, err := kv.Get(ctx, section, "no-cmp-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "success-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare exists false for non-existent key", func(t *testing.T) {
// Key doesn't exist, so exists should be false
cmps := []resource.Compare{
resource.CompareKeyNotExists("non-existent-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "created-key", Value: []byte("created-value")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "non-existent-failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was created
reader, err := kv.Get(ctx, section, "created-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "created-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure op was not executed
_, err = kv.Get(ctx, section, "non-existent-failure-marker")
assert.Error(t, err)
})
t.Run("txn compare exists true for existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader("existing-value"))
// Key exists, so exists should be true
cmps := []resource.Compare{
resource.CompareKeyExists("existing-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "existing-key", Value: []byte("updated-value")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "existing-failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "existing-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "updated-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure op was not executed
_, err = kv.Get(ctx, section, "existing-failure-marker")
assert.Error(t, err)
})
t.Run("txn compare exists fails when key exists but expected not to", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "exists-fail-key", strings.NewReader("some-value"))
// Key exists but we expect it not to
cmps := []resource.Compare{
resource.CompareKeyNotExists("exists-fail-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "exists-fail-result", Value: []byte("should-not-exist")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "exists-fail-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify success op was not executed
_, err = kv.Get(ctx, section, "exists-fail-result")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
// Verify failure ops were executed
reader, err := kv.Get(ctx, section, "exists-fail-marker")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "failure-executed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare value equals", func(t *testing.T) {
// First create a key with known value
saveKVHelper(t, kv, ctx, section, "value-cmp-key", strings.NewReader("expected-value"))
cmps := []resource.Compare{
resource.CompareKeyValue("value-cmp-key", resource.CompareEqual, []byte("expected-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "value-cmp-key", Value: []byte("new-value")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "value-cmp-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "new-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare value fails executes failure ops", func(t *testing.T) {
// First create a key with known value
saveKVHelper(t, kv, ctx, section, "value-fail-key", strings.NewReader("actual-value"))
cmps := []resource.Compare{
resource.CompareKeyValue("value-fail-key", resource.CompareEqual, []byte("wrong-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "value-fail-key", Value: []byte("should-not-be-set")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify the original key was not changed
reader, err := kv.Get(ctx, section, "value-fail-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "actual-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure ops were executed
reader, err = kv.Get(ctx, section, "failure-marker")
require.NoError(t, err)
value, err = io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "failure-executed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn delete operation", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "delete-txn-key", strings.NewReader("to-be-deleted"))
cmps := []resource.Compare{
resource.CompareKeyExists("delete-txn-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpDelete, Key: "delete-txn-key"},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was deleted
_, err = kv.Get(ctx, section, "delete-txn-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("txn multiple comparisons all must pass", func(t *testing.T) {
// Create two keys
saveKVHelper(t, kv, ctx, section, "multi-cmp-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "multi-cmp-key2", strings.NewReader("value2"))
cmps := []resource.Compare{
resource.CompareKeyValue("multi-cmp-key1", resource.CompareEqual, []byte("value1")),
resource.CompareKeyValue("multi-cmp-key2", resource.CompareEqual, []byte("value2")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "multi-success", Value: []byte("both-passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify success op was executed
reader, err := kv.Get(ctx, section, "multi-success")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "both-passed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn multiple comparisons one fails", func(t *testing.T) {
// Create two keys
saveKVHelper(t, kv, ctx, section, "multi-fail-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "multi-fail-key2", strings.NewReader("value2"))
cmps := []resource.Compare{
resource.CompareKeyValue("multi-fail-key1", resource.CompareEqual, []byte("value1")),
resource.CompareKeyValue("multi-fail-key2", resource.CompareEqual, []byte("wrong-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "multi-fail-success", Value: []byte("should-not-exist")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify success op was not executed
_, err = kv.Get(ctx, section, "multi-fail-success")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("txn too many comparisons", func(t *testing.T) {
cmps := make([]resource.Compare, resource.MaxTxnCompares+1)
for i := range cmps {
cmps[i] = resource.CompareKeyNotExists(fmt.Sprintf("key-%d", i))
}
_, err := kv.Txn(ctx, section, cmps, nil, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many comparisons")
})
t.Run("txn too many success operations", func(t *testing.T) {
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
for i := range ops {
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
}
_, err := kv.Txn(ctx, section, nil, ops, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many success operations")
})
t.Run("txn too many failure operations", func(t *testing.T) {
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
for i := range ops {
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
}
_, err := kv.Txn(ctx, section, nil, nil, ops)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many failure operations")
})
t.Run("txn compare greater than", func(t *testing.T) {
saveKVHelper(t, kv, ctx, section, "greater-key", strings.NewReader("bbb"))
cmps := []resource.Compare{
resource.CompareKeyValue("greater-key", resource.CompareGreater, []byte("aaa")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "greater-result", Value: []byte("passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn compare less than", func(t *testing.T) {
saveKVHelper(t, kv, ctx, section, "less-key", strings.NewReader("aaa"))
cmps := []resource.Compare{
resource.CompareKeyValue("less-key", resource.CompareLess, []byte("bbb")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "less-result", Value: []byte("passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn compare not equal for non-existent value key", func(t *testing.T) {
// Key doesn't exist, comparing value should fail for equal but pass for not-equal
cmps := []resource.Compare{
resource.CompareKeyValue("non-existent-value-key", resource.CompareNotEqual, []byte("any-value")),
}
successOps := []resource.TxnOp{
resource.TxnPut("not-equal-result", []byte("passed")),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn using constructor functions", func(t *testing.T) {
// Test that constructor functions work correctly
saveKVHelper(t, kv, ctx, section, "constructor-key", strings.NewReader("constructor-value"))
cmps := []resource.Compare{
resource.CompareKeyExists("constructor-key"),
}
successOps := []resource.TxnOp{
resource.TxnPut("constructor-new", []byte("new-value")),
resource.TxnDelete("constructor-key"),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the operations actually happened
_, err = kv.Get(ctx, section, "constructor-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
reader, err := kv.Get(ctx, section, "constructor-new")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "new-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn nested transaction success path", func(t *testing.T) {
// Test nested transaction: if key1 exists, then check key2 and write based on that
saveKVHelper(t, kv, ctx, section, "nested-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "nested-key2", strings.NewReader("value2"))
cmps := []resource.Compare{
resource.CompareKeyExists("nested-key1"),
}
successOps := []resource.TxnOp{
// Nested transaction: if key2 has expected value, write result
resource.TxnNested(
[]resource.Compare{
resource.CompareKeyValue("nested-key2", resource.CompareEqual, []byte("value2")),
},
[]resource.TxnOp{
resource.TxnPut("nested-result", []byte("both-conditions-met")),
},
[]resource.TxnOp{
resource.TxnPut("nested-result", []byte("only-key1-exists")),
},
),
}
failureOps := []resource.TxnOp{
resource.TxnPut("nested-result", []byte("key1-not-found")),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the nested transaction executed the success path
reader, err := kv.Get(ctx, section, "nested-result")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "both-conditions-met", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn nested transaction failure path", func(t *testing.T) {
// Test nested transaction where inner compare fails
saveKVHelper(t, kv, ctx, section, "nested-fail-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "nested-fail-key2", strings.NewReader("wrong-value"))
cmps := []resource.Compare{
resource.CompareKeyExists("nested-fail-key1"),
}
successOps := []resource.TxnOp{
// Nested transaction: key2 has wrong value, so inner failure ops execute
resource.TxnNested(
[]resource.Compare{
resource.CompareKeyValue("nested-fail-key2", resource.CompareEqual, []byte("expected-value")),
},
[]resource.TxnOp{
resource.TxnPut("nested-fail-result", []byte("inner-success")),
},
[]resource.TxnOp{
resource.TxnPut("nested-fail-result", []byte("inner-failure")),
},
),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded) // Outer succeeded, inner failed
// Verify the nested transaction executed the failure path
reader, err := kv.Get(ctx, section, "nested-fail-result")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "inner-failure", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn nested transaction max depth exceeded", func(t *testing.T) {
// Test that exceeding max depth (MaxTxnDepth=3) returns an error
// Create a 4-level deep nested transaction
deeplyNested := resource.TxnNested(
nil,
[]resource.TxnOp{
resource.TxnNested(
nil,
[]resource.TxnOp{
resource.TxnNested(
nil,
[]resource.TxnOp{
resource.TxnPut("deep-key", []byte("value")),
},
nil,
),
},
nil,
),
},
nil,
)
_, err := kv.Txn(ctx, section, nil, []resource.TxnOp{deeplyNested}, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "nesting too deep")
})
t.Run("txn total payload size exceeded", func(t *testing.T) {
// Test that exceeding total payload size (1MB) returns an error
largeValue := make([]byte, 90*1024) // 90KB each
for i := range largeValue {
largeValue[i] = byte(i % 256)
}
// 8 compares + 8 success ops + 8 failure ops = 24 * 90KB = 2.16MB > 1MB limit
cmps := make([]resource.Compare, 8)
for i := range cmps {
cmps[i] = resource.CompareKeyValue(fmt.Sprintf("size-key-%d", i), resource.CompareEqual, largeValue)
}
successOps := make([]resource.TxnOp, 8)
for i := range successOps {
successOps[i] = resource.TxnPut(fmt.Sprintf("size-success-%d", i), largeValue)
}
failureOps := make([]resource.TxnOp, 8)
for i := range failureOps {
failureOps[i] = resource.TxnPut(fmt.Sprintf("size-failure-%d", i), largeValue)
}
_, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
assert.Error(t, err)
assert.Contains(t, err.Error(), "total transaction payload too large")
})
t.Run("txn concurrent compare and swap", func(t *testing.T) {
// Test that when multiple goroutines try to compare-and-swap the same key,
// only one succeeds. All goroutines see the same initial value and try to
// update it atomically. This verifies serializable isolation.
//
// Expected behavior:
// - All goroutines read the initial value and their compare succeeds
// - All try to write, but only one can commit successfully
// - Others get either Succeeded=false (if compare ran after winner committed)
// or ErrConflict (if they tried to commit after winner but read before)
const numGoroutines = 10
casKey := "concurrent-cas-key"
initialValue := "initial-value"
// Create the key with an initial value
saveKVHelper(t, kv, ctx, section, casKey, strings.NewReader(initialValue))
type result struct {
id int
succeeded bool
err error
}
results := make(chan result, numGoroutines)
start := make(chan struct{})
// Launch goroutines that all try to compare-and-swap simultaneously
for i := 0; i < numGoroutines; i++ {
go func(id int) {
<-start // Wait for signal to start simultaneously
// Compare that the value is still the initial value, then update it
cmps := []resource.Compare{
resource.CompareKeyValue(casKey, resource.CompareEqual, []byte(initialValue)),
}
successOps := []resource.TxnOp{
resource.TxnPut(casKey, []byte(fmt.Sprintf("owner-%d", id))),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
succeeded := err == nil && resp != nil && resp.Succeeded
results <- result{id: id, succeeded: succeeded, err: err}
}(i)
}
// Start all goroutines at once
close(start)
// Collect results
successCount := 0
conflictCount := 0
compareFailedCount := 0
winnerId := -1
for i := 0; i < numGoroutines; i++ {
r := <-results
if r.err != nil {
// ErrConflict is expected when multiple transactions race
conflictCount++
t.Logf("goroutine %d got conflict: %v", r.id, r.err)
} else if r.succeeded {
successCount++
winnerId = r.id
} else {
// Compare failed (saw updated value after winner committed)
compareFailedCount++
}
}
t.Logf("Results: %d succeeded, %d conflicts, %d compare-failed", successCount, conflictCount, compareFailedCount)
// Exactly one goroutine should have succeeded in the compare-and-swap
assert.Equal(t, 1, successCount, "exactly one transaction should succeed, but %d succeeded", successCount)
require.NotEqual(t, -1, winnerId, "should have a winner")
// All other goroutines should have either got a conflict or had their compare fail
assert.Equal(t, numGoroutines-1, conflictCount+compareFailedCount,
"all non-winners should have either conflict or compare-failed")
// Verify the key has the winner's value
reader, err := kv.Get(ctx, section, casKey)
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
expectedValue := fmt.Sprintf("owner-%d", winnerId)
assert.Equal(t, expectedValue, string(value), "value should match the winner's id")
err = reader.Close()
require.NoError(t, err)
})
}