Compare commits

...

2 Commits

Author SHA1 Message Date
Georges Chaudy
6e39b24b6f Enhance comparison functions in KV storage
- Updated the CompareKeyExists function to simplify its usage by removing the 'exists' parameter, making it always succeed if the key exists.
- Introduced a new CompareKeyNotExists function to check for non-existent keys.
- Modified transaction tests to utilize the new comparison functions, ensuring accurate behavior for both existing and non-existing keys.
2025-12-04 16:17:43 +01:00
Georges Chaudy
c31c1d8e8d Add transaction support to KV storage
- Introduced transaction operations with compare-and-swap semantics.
- Added types and functions for comparisons (e.g., CompareExists, CompareValue) and transaction operations (e.g., TxnOpPut, TxnOpDelete).
- Implemented Txn method to execute transactions based on comparisons and success/failure operations.
- Added validation for transaction requests to enforce limits on comparisons and operations.
- Enhanced testing suite to cover various transaction scenarios, including success and failure cases, and edge conditions.

This update enhances the KV interface, allowing for more complex data manipulation while ensuring data integrity through transactional guarantees.
2025-12-03 11:33:22 +01:00
2 changed files with 577 additions and 0 deletions

View File

@@ -35,6 +35,109 @@ type ListOptions struct {
Limit int64 // maximum number of results to return. 0 means no limit.
}
// CompareTarget specifies what to compare in a transaction
type CompareTarget int
const (
CompareExists CompareTarget = iota // Check if key exists (Value: bool)
CompareValue // Compare actual value (Value: []byte)
)
// CompareResult specifies the comparison operator
type CompareResult int
const (
CompareEqual CompareResult = iota
CompareNotEqual
CompareGreater
CompareLess
)
// Compare represents a single comparison in a transaction.
// Use the constructor functions CompareKeyExists, CompareKeyNotExists, and CompareKeyValue to create comparisons.
type Compare struct {
Key string
Target CompareTarget
Result CompareResult // Only used for CompareValue
Exists bool // Used when Target == CompareExists
Value []byte // Used when Target == CompareValue
}
// CompareKeyExists creates a comparison that succeeds if the key exists.
func CompareKeyExists(key string) Compare {
return Compare{Key: key, Target: CompareExists, Exists: true}
}
// CompareKeyNotExists creates a comparison that succeeds if the key does not exist.
func CompareKeyNotExists(key string) Compare {
return Compare{Key: key, Target: CompareExists, Exists: false}
}
// CompareKeyValue creates a comparison that compares the value of a key.
// The comparison succeeds if the stored value matches the expected value according to the result operator.
func CompareKeyValue(key string, result CompareResult, value []byte) Compare {
return Compare{Key: key, Target: CompareValue, Result: result, Value: value}
}
// TxnOpType specifies the type of operation in a transaction
type TxnOpType int
const (
TxnOpPut TxnOpType = iota
TxnOpDelete
)
// TxnOp represents an operation in a transaction.
// Use the constructor functions TxnPut and TxnDelete to create operations.
type TxnOp struct {
Type TxnOpType
Key string
Value []byte // For Put operations
}
// TxnPut creates a Put operation that stores a value at the given key.
func TxnPut(key string, value []byte) TxnOp {
return TxnOp{Type: TxnOpPut, Key: key, Value: value}
}
// TxnDelete creates a Delete operation that removes the given key.
func TxnDelete(key string) TxnOp {
return TxnOp{Type: TxnOpDelete, Key: key}
}
// TxnResponse contains the result of a transaction
type TxnResponse struct {
// Succeeded indicates whether the comparisons passed (true) or failed (false)
Succeeded bool
}
// Maximum limits for transaction operations
const (
MaxTxnCompares = 8
MaxTxnOps = 8
)
// ValidateTxnRequest validates the transaction request parameters.
func ValidateTxnRequest(section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) error {
if section == "" {
return fmt.Errorf("section is required")
}
if len(cmps) > MaxTxnCompares {
return fmt.Errorf("too many comparisons: %d > %d", len(cmps), MaxTxnCompares)
}
if len(successOps) > MaxTxnOps {
return fmt.Errorf("too many success operations: %d > %d", len(successOps), MaxTxnOps)
}
if len(failureOps) > MaxTxnOps {
return fmt.Errorf("too many failure operations: %d > %d", len(failureOps), MaxTxnOps)
}
return nil
}
type KV interface {
// Keys returns all the keys in the store
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
@@ -60,6 +163,11 @@ type KV interface {
// UnixTimestamp returns the current time in seconds since Epoch.
// This is used to ensure the server and client are not too far apart in time.
UnixTimestamp(ctx context.Context) (int64, error)
// Txn executes a transaction with compare-and-swap semantics.
// If all comparisons succeed, successOps are executed; otherwise failureOps are executed.
// Limited to MaxTxnCompares comparisons and MaxTxnOps operations each for success/failure.
Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error)
}
var _ KV = &badgerKV{}
@@ -360,3 +468,102 @@ func (k *badgerKV) BatchDelete(ctx context.Context, section string, keys []strin
return txn.Commit()
}
func (k *badgerKV) Txn(ctx context.Context, section string, cmps []Compare, successOps []TxnOp, failureOps []TxnOp) (*TxnResponse, error) {
if k.db.IsClosed() {
return nil, fmt.Errorf("database is closed")
}
if err := ValidateTxnRequest(section, cmps, successOps, failureOps); err != nil {
return nil, err
}
txn := k.db.NewTransaction(true)
defer txn.Discard()
// Evaluate all comparisons
succeeded := true
for _, cmp := range cmps {
keyWithSection := section + "/" + cmp.Key
item, err := txn.Get([]byte(keyWithSection))
keyExists := err == nil
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return nil, err
}
switch cmp.Target {
case CompareExists:
if keyExists != cmp.Exists {
succeeded = false
}
case CompareValue:
if !keyExists {
// Key doesn't exist, comparison fails unless comparing for not-equal
if cmp.Result != CompareNotEqual {
succeeded = false
}
} else {
itemValue, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
if !compareBytes(itemValue, cmp.Value, cmp.Result) {
succeeded = false
}
}
default:
return nil, fmt.Errorf("unknown compare target: %d", cmp.Target)
}
if !succeeded {
break
}
}
// Execute the appropriate operations
ops := successOps
if !succeeded {
ops = failureOps
}
for _, op := range ops {
keyWithSection := section + "/" + op.Key
switch op.Type {
case TxnOpPut:
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
return nil, err
}
case TxnOpDelete:
if err := txn.Delete([]byte(keyWithSection)); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown operation type: %d", op.Type)
}
}
if err := txn.Commit(); err != nil {
return nil, err
}
return &TxnResponse{Succeeded: succeeded}, nil
}
// compareBytes compares two byte slices based on the comparison result type
func compareBytes(a, b []byte, result CompareResult) bool {
cmp := bytes.Compare(a, b)
switch result {
case CompareEqual:
return cmp == 0
case CompareNotEqual:
return cmp != 0
case CompareGreater:
return cmp > 0
case CompareLess:
return cmp < 0
default:
return false
}
}

View File

@@ -28,6 +28,7 @@ const (
TestKVUnixTimestamp = "unix timestamp"
TestKVBatchGet = "batch get operations"
TestKVBatchDelete = "batch delete operations"
TestKVTxn = "transaction operations"
)
// NewKVFunc is a function that creates a new KV instance for testing
@@ -69,6 +70,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
{TestKVBatchGet, runTestKVBatchGet},
{TestKVBatchDelete, runTestKVBatchDelete},
{TestKVTxn, runTestKVTxn},
}
for _, tc := range cases {
@@ -801,3 +803,371 @@ func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, ke
err = writer.Close()
require.NoError(t, err)
}
func runTestKVTxn(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-txn"
t.Run("txn with empty section", func(t *testing.T) {
_, err := kv.Txn(ctx, "", nil, nil, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
t.Run("txn with no comparisons executes success ops", func(t *testing.T) {
// With no comparisons, all comparisons "pass" so success ops should run
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "no-cmp-key", Value: []byte("success-value")},
}
resp, err := kv.Txn(ctx, section, nil, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was created
reader, err := kv.Get(ctx, section, "no-cmp-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "success-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare exists false for non-existent key", func(t *testing.T) {
// Key doesn't exist, so exists should be false
cmps := []resource.Compare{
resource.CompareKeyNotExists("non-existent-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "created-key", Value: []byte("created-value")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "non-existent-failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was created
reader, err := kv.Get(ctx, section, "created-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "created-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure op was not executed
_, err = kv.Get(ctx, section, "non-existent-failure-marker")
assert.Error(t, err)
})
t.Run("txn compare exists true for existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader("existing-value"))
// Key exists, so exists should be true
cmps := []resource.Compare{
resource.CompareKeyExists("existing-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "existing-key", Value: []byte("updated-value")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "existing-failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "existing-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "updated-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure op was not executed
_, err = kv.Get(ctx, section, "existing-failure-marker")
assert.Error(t, err)
})
t.Run("txn compare exists fails when key exists but expected not to", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "exists-fail-key", strings.NewReader("some-value"))
// Key exists but we expect it not to
cmps := []resource.Compare{
resource.CompareKeyNotExists("exists-fail-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "exists-fail-result", Value: []byte("should-not-exist")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "exists-fail-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify success op was not executed
_, err = kv.Get(ctx, section, "exists-fail-result")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
// Verify failure ops were executed
reader, err := kv.Get(ctx, section, "exists-fail-marker")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "failure-executed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare value equals", func(t *testing.T) {
// First create a key with known value
saveKVHelper(t, kv, ctx, section, "value-cmp-key", strings.NewReader("expected-value"))
cmps := []resource.Compare{
resource.CompareKeyValue("value-cmp-key", resource.CompareEqual, []byte("expected-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "value-cmp-key", Value: []byte("new-value")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "value-cmp-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "new-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn compare value fails executes failure ops", func(t *testing.T) {
// First create a key with known value
saveKVHelper(t, kv, ctx, section, "value-fail-key", strings.NewReader("actual-value"))
cmps := []resource.Compare{
resource.CompareKeyValue("value-fail-key", resource.CompareEqual, []byte("wrong-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "value-fail-key", Value: []byte("should-not-be-set")},
}
failureOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "failure-marker", Value: []byte("failure-executed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, failureOps)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify the original key was not changed
reader, err := kv.Get(ctx, section, "value-fail-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "actual-value", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify failure ops were executed
reader, err = kv.Get(ctx, section, "failure-marker")
require.NoError(t, err)
value, err = io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "failure-executed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn delete operation", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "delete-txn-key", strings.NewReader("to-be-deleted"))
cmps := []resource.Compare{
resource.CompareKeyExists("delete-txn-key"),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpDelete, Key: "delete-txn-key"},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the key was deleted
_, err = kv.Get(ctx, section, "delete-txn-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("txn multiple comparisons all must pass", func(t *testing.T) {
// Create two keys
saveKVHelper(t, kv, ctx, section, "multi-cmp-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "multi-cmp-key2", strings.NewReader("value2"))
cmps := []resource.Compare{
resource.CompareKeyValue("multi-cmp-key1", resource.CompareEqual, []byte("value1")),
resource.CompareKeyValue("multi-cmp-key2", resource.CompareEqual, []byte("value2")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "multi-success", Value: []byte("both-passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify success op was executed
reader, err := kv.Get(ctx, section, "multi-success")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "both-passed", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("txn multiple comparisons one fails", func(t *testing.T) {
// Create two keys
saveKVHelper(t, kv, ctx, section, "multi-fail-key1", strings.NewReader("value1"))
saveKVHelper(t, kv, ctx, section, "multi-fail-key2", strings.NewReader("value2"))
cmps := []resource.Compare{
resource.CompareKeyValue("multi-fail-key1", resource.CompareEqual, []byte("value1")),
resource.CompareKeyValue("multi-fail-key2", resource.CompareEqual, []byte("wrong-value")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "multi-fail-success", Value: []byte("should-not-exist")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.False(t, resp.Succeeded)
// Verify success op was not executed
_, err = kv.Get(ctx, section, "multi-fail-success")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
})
t.Run("txn too many comparisons", func(t *testing.T) {
cmps := make([]resource.Compare, resource.MaxTxnCompares+1)
for i := range cmps {
cmps[i] = resource.CompareKeyNotExists(fmt.Sprintf("key-%d", i))
}
_, err := kv.Txn(ctx, section, cmps, nil, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many comparisons")
})
t.Run("txn too many success operations", func(t *testing.T) {
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
for i := range ops {
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
}
_, err := kv.Txn(ctx, section, nil, ops, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many success operations")
})
t.Run("txn too many failure operations", func(t *testing.T) {
ops := make([]resource.TxnOp, resource.MaxTxnOps+1)
for i := range ops {
ops[i] = resource.TxnOp{Type: resource.TxnOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
}
_, err := kv.Txn(ctx, section, nil, nil, ops)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many failure operations")
})
t.Run("txn compare greater than", func(t *testing.T) {
saveKVHelper(t, kv, ctx, section, "greater-key", strings.NewReader("bbb"))
cmps := []resource.Compare{
resource.CompareKeyValue("greater-key", resource.CompareGreater, []byte("aaa")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "greater-result", Value: []byte("passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn compare less than", func(t *testing.T) {
saveKVHelper(t, kv, ctx, section, "less-key", strings.NewReader("aaa"))
cmps := []resource.Compare{
resource.CompareKeyValue("less-key", resource.CompareLess, []byte("bbb")),
}
successOps := []resource.TxnOp{
{Type: resource.TxnOpPut, Key: "less-result", Value: []byte("passed")},
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn compare not equal for non-existent value key", func(t *testing.T) {
// Key doesn't exist, comparing value should fail for equal but pass for not-equal
cmps := []resource.Compare{
resource.CompareKeyValue("non-existent-value-key", resource.CompareNotEqual, []byte("any-value")),
}
successOps := []resource.TxnOp{
resource.TxnPut("not-equal-result", []byte("passed")),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
})
t.Run("txn using constructor functions", func(t *testing.T) {
// Test that constructor functions work correctly
saveKVHelper(t, kv, ctx, section, "constructor-key", strings.NewReader("constructor-value"))
cmps := []resource.Compare{
resource.CompareKeyExists("constructor-key"),
}
successOps := []resource.TxnOp{
resource.TxnPut("constructor-new", []byte("new-value")),
resource.TxnDelete("constructor-key"),
}
resp, err := kv.Txn(ctx, section, cmps, successOps, nil)
require.NoError(t, err)
assert.True(t, resp.Succeeded)
// Verify the operations actually happened
_, err = kv.Get(ctx, section, "constructor-key")
assert.Error(t, err)
assert.Equal(t, resource.ErrNotFound, err)
reader, err := kv.Get(ctx, section, "constructor-new")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "new-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
}