Compare commits

...

3 Commits

Author SHA1 Message Date
Georges Chaudy
3c83c64d72 Implement batch operation support in KV storage
- Introduced a new Batch method to execute multiple operations atomically within a single transaction.
- Defined BatchOp types for various operations: put, create, update, and delete, with specific semantics for each.
- Updated the KV interface and implementation to support batch operations, ensuring rollback on failure.
- Enhanced testing suite to cover various batch scenarios, including success, failure, and edge cases, ensuring robust functionality.
2026-01-09 15:24:34 +01:00
Georges Chaudy
6e39b24b6f Enhance comparison functions in KV storage
- Updated the CompareKeyExists function to simplify its usage by removing the 'exists' parameter, making it always succeed if the key exists.
- Introduced a new CompareKeyNotExists function to check for non-existent keys.
- Modified transaction tests to utilize the new comparison functions, ensuring accurate behavior for both existing and non-existing keys.
2025-12-04 16:17:43 +01:00
Georges Chaudy
c31c1d8e8d Add transaction support to KV storage
- Introduced transaction operations with compare-and-swap semantics.
- Added types and functions for comparisons (e.g., CompareExists, CompareValue) and transaction operations (e.g., TxnOpPut, TxnOpDelete).
- Implemented Txn method to execute transactions based on comparisons and success/failure operations.
- Added validation for transaction requests to enforce limits on comparisons and operations.
- Enhanced testing suite to cover various transaction scenarios, including success and failure cases, and edge conditions.

This update enhances the KV interface, allowing for more complex data manipulation while ensuring data integrity through transactional guarantees.
2025-12-03 11:33:22 +01:00
2 changed files with 362 additions and 0 deletions

View File

@@ -35,6 +35,33 @@ type ListOptions struct {
Limit int64 // maximum number of results to return. 0 means no limit.
}
// BatchOpMode controls the semantics of each operation in a batch
type BatchOpMode int
const (
// BatchOpPut performs an upsert: create or update (never fails on key state)
BatchOpPut BatchOpMode = iota
// BatchOpCreate creates a new key, fails if the key already exists
BatchOpCreate
// BatchOpUpdate updates an existing key, fails if the key doesn't exist
BatchOpUpdate
// BatchOpDelete removes a key, idempotent (never fails on key state)
BatchOpDelete
)
// BatchOp represents a single operation in an atomic batch
type BatchOp struct {
Mode BatchOpMode
Key string
Value []byte // For Put/Create/Update operations, nil for Delete
}
// Maximum limit for batch operations
const MaxBatchOps = 20
// ErrKeyAlreadyExists is returned when BatchOpCreate is used on an existing key
var ErrKeyAlreadyExists = errors.New("key already exists")
type KV interface {
// Keys returns all the keys in the store
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
@@ -60,6 +87,17 @@ type KV interface {
// UnixTimestamp returns the current time in seconds since Epoch.
// This is used to ensure the server and client are not too far apart in time.
UnixTimestamp(ctx context.Context) (int64, error)
// Batch executes all operations atomically within a single transaction.
// If any operation fails, all operations are rolled back.
// Operations are executed in order; the batch stops on first failure.
//
// Operation semantics:
// - BatchOpPut: Upsert (create or update), never fails on key state
// - BatchOpCreate: Fail with ErrKeyAlreadyExists if key exists
// - BatchOpUpdate: Fail with ErrNotFound if key doesn't exist
// - BatchOpDelete: Idempotent, never fails on key state
Batch(ctx context.Context, section string, ops []BatchOp) error
}
var _ KV = &badgerKV{}
@@ -360,3 +398,69 @@ func (k *badgerKV) BatchDelete(ctx context.Context, section string, keys []strin
return txn.Commit()
}
func (k *badgerKV) Batch(ctx context.Context, section string, ops []BatchOp) error {
if k.db.IsClosed() {
return fmt.Errorf("database is closed")
}
if section == "" {
return fmt.Errorf("section is required")
}
if len(ops) > MaxBatchOps {
return fmt.Errorf("too many operations: %d > %d", len(ops), MaxBatchOps)
}
txn := k.db.NewTransaction(true)
defer txn.Discard()
for _, op := range ops {
keyWithSection := section + "/" + op.Key
switch op.Mode {
case BatchOpCreate:
// Check that key doesn't exist, then set
_, err := txn.Get([]byte(keyWithSection))
if err == nil {
return ErrKeyAlreadyExists
}
if !errors.Is(err, badger.ErrKeyNotFound) {
return err
}
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
return err
}
case BatchOpUpdate:
// Check that key exists, then set
_, err := txn.Get([]byte(keyWithSection))
if errors.Is(err, badger.ErrKeyNotFound) {
return ErrNotFound
}
if err != nil {
return err
}
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
return err
}
case BatchOpPut:
// Upsert: create or update
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
return err
}
case BatchOpDelete:
// Idempotent delete - don't error if not found
if err := txn.Delete([]byte(keyWithSection)); err != nil {
return err
}
default:
return fmt.Errorf("unknown operation mode: %d", op.Mode)
}
}
return txn.Commit()
}

View File

@@ -28,6 +28,7 @@ const (
TestKVUnixTimestamp = "unix timestamp"
TestKVBatchGet = "batch get operations"
TestKVBatchDelete = "batch delete operations"
TestKVBatch = "batch operations"
)
// NewKVFunc is a function that creates a new KV instance for testing
@@ -69,6 +70,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
{TestKVBatchGet, runTestKVBatchGet},
{TestKVBatchDelete, runTestKVBatchDelete},
{TestKVBatch, runTestKVBatch},
}
for _, tc := range cases {
@@ -801,3 +803,259 @@ func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, ke
err = writer.Close()
require.NoError(t, err)
}
func runTestKVBatch(t *testing.T, kv resource.KV, nsPrefix string) {
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
section := nsPrefix + "-batch"
t.Run("batch with empty section", func(t *testing.T) {
err := kv.Batch(ctx, "", nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "section is required")
})
t.Run("batch with empty ops succeeds", func(t *testing.T) {
err := kv.Batch(ctx, section, nil)
require.NoError(t, err)
})
t.Run("batch put creates new key", func(t *testing.T) {
ops := []resource.BatchOp{
{Mode: resource.BatchOpPut, Key: "put-key", Value: []byte("put-value")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify the key was created
reader, err := kv.Get(ctx, section, "put-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "put-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch put updates existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "put-update-key", strings.NewReader("original-value"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpPut, Key: "put-update-key", Value: []byte("updated-value")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "put-update-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "updated-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch create succeeds for new key", func(t *testing.T) {
ops := []resource.BatchOp{
{Mode: resource.BatchOpCreate, Key: "create-new-key", Value: []byte("new-value")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify the key was created
reader, err := kv.Get(ctx, section, "create-new-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "new-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch create fails for existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "create-exists-key", strings.NewReader("existing-value"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpCreate, Key: "create-exists-key", Value: []byte("new-value")},
}
err := kv.Batch(ctx, section, ops)
assert.ErrorIs(t, err, resource.ErrKeyAlreadyExists)
// Verify the original value is unchanged
reader, err := kv.Get(ctx, section, "create-exists-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "existing-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch update succeeds for existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "update-exists-key", strings.NewReader("original-value"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpUpdate, Key: "update-exists-key", Value: []byte("updated-value")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify the key was updated
reader, err := kv.Get(ctx, section, "update-exists-key")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "updated-value", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch update fails for non-existent key", func(t *testing.T) {
ops := []resource.BatchOp{
{Mode: resource.BatchOpUpdate, Key: "update-nonexistent-key", Value: []byte("new-value")},
}
err := kv.Batch(ctx, section, ops)
assert.ErrorIs(t, err, resource.ErrNotFound)
// Verify the key was not created
_, err = kv.Get(ctx, section, "update-nonexistent-key")
assert.ErrorIs(t, err, resource.ErrNotFound)
})
t.Run("batch delete removes existing key", func(t *testing.T) {
// First create a key
saveKVHelper(t, kv, ctx, section, "delete-exists-key", strings.NewReader("to-be-deleted"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpDelete, Key: "delete-exists-key"},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify the key was deleted
_, err = kv.Get(ctx, section, "delete-exists-key")
assert.ErrorIs(t, err, resource.ErrNotFound)
})
t.Run("batch delete is idempotent for non-existent key", func(t *testing.T) {
ops := []resource.BatchOp{
{Mode: resource.BatchOpDelete, Key: "delete-nonexistent-key"},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err) // Should succeed even though key doesn't exist
})
t.Run("batch multiple operations atomic success", func(t *testing.T) {
ops := []resource.BatchOp{
{Mode: resource.BatchOpPut, Key: "multi-key1", Value: []byte("value1")},
{Mode: resource.BatchOpPut, Key: "multi-key2", Value: []byte("value2")},
{Mode: resource.BatchOpPut, Key: "multi-key3", Value: []byte("value3")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify all keys were created
for i := 1; i <= 3; i++ {
key := fmt.Sprintf("multi-key%d", i)
reader, err := kv.Get(ctx, section, key)
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, fmt.Sprintf("value%d", i), string(value))
err = reader.Close()
require.NoError(t, err)
}
})
t.Run("batch multiple operations atomic rollback on failure", func(t *testing.T) {
// First create a key that will cause the batch to fail
saveKVHelper(t, kv, ctx, section, "rollback-exists", strings.NewReader("existing"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpPut, Key: "rollback-new1", Value: []byte("value1")},
{Mode: resource.BatchOpCreate, Key: "rollback-exists", Value: []byte("should-fail")}, // This will fail
{Mode: resource.BatchOpPut, Key: "rollback-new2", Value: []byte("value2")},
}
err := kv.Batch(ctx, section, ops)
assert.ErrorIs(t, err, resource.ErrKeyAlreadyExists)
// Verify rollback: the first operation should NOT have persisted
_, err = kv.Get(ctx, section, "rollback-new1")
assert.ErrorIs(t, err, resource.ErrNotFound)
// Verify the third operation was not executed
_, err = kv.Get(ctx, section, "rollback-new2")
assert.ErrorIs(t, err, resource.ErrNotFound)
})
t.Run("batch mixed operations", func(t *testing.T) {
// Setup: create a key to update and one to delete
saveKVHelper(t, kv, ctx, section, "mixed-update", strings.NewReader("original"))
saveKVHelper(t, kv, ctx, section, "mixed-delete", strings.NewReader("to-delete"))
ops := []resource.BatchOp{
{Mode: resource.BatchOpCreate, Key: "mixed-create", Value: []byte("created")},
{Mode: resource.BatchOpUpdate, Key: "mixed-update", Value: []byte("updated")},
{Mode: resource.BatchOpDelete, Key: "mixed-delete"},
{Mode: resource.BatchOpPut, Key: "mixed-put", Value: []byte("put")},
}
err := kv.Batch(ctx, section, ops)
require.NoError(t, err)
// Verify create
reader, err := kv.Get(ctx, section, "mixed-create")
require.NoError(t, err)
value, err := io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "created", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify update
reader, err = kv.Get(ctx, section, "mixed-update")
require.NoError(t, err)
value, err = io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "updated", string(value))
err = reader.Close()
require.NoError(t, err)
// Verify delete
_, err = kv.Get(ctx, section, "mixed-delete")
assert.ErrorIs(t, err, resource.ErrNotFound)
// Verify put
reader, err = kv.Get(ctx, section, "mixed-put")
require.NoError(t, err)
value, err = io.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, "put", string(value))
err = reader.Close()
require.NoError(t, err)
})
t.Run("batch too many operations", func(t *testing.T) {
ops := make([]resource.BatchOp, resource.MaxBatchOps+1)
for i := range ops {
ops[i] = resource.BatchOp{Mode: resource.BatchOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
}
err := kv.Batch(ctx, section, ops)
assert.Error(t, err)
assert.Contains(t, err.Error(), "too many operations")
})
}