mirror of
https://github.com/grafana/grafana.git
synced 2026-01-11 22:44:06 +08:00
Compare commits
3 Commits
njvrzm/err
...
kvstore-ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c83c64d72 | ||
|
|
6e39b24b6f | ||
|
|
c31c1d8e8d |
@@ -35,6 +35,33 @@ type ListOptions struct {
|
||||
Limit int64 // maximum number of results to return. 0 means no limit.
|
||||
}
|
||||
|
||||
// BatchOpMode controls the semantics of each operation in a batch
|
||||
type BatchOpMode int
|
||||
|
||||
const (
|
||||
// BatchOpPut performs an upsert: create or update (never fails on key state)
|
||||
BatchOpPut BatchOpMode = iota
|
||||
// BatchOpCreate creates a new key, fails if the key already exists
|
||||
BatchOpCreate
|
||||
// BatchOpUpdate updates an existing key, fails if the key doesn't exist
|
||||
BatchOpUpdate
|
||||
// BatchOpDelete removes a key, idempotent (never fails on key state)
|
||||
BatchOpDelete
|
||||
)
|
||||
|
||||
// BatchOp represents a single operation in an atomic batch
|
||||
type BatchOp struct {
|
||||
Mode BatchOpMode
|
||||
Key string
|
||||
Value []byte // For Put/Create/Update operations, nil for Delete
|
||||
}
|
||||
|
||||
// Maximum limit for batch operations
|
||||
const MaxBatchOps = 20
|
||||
|
||||
// ErrKeyAlreadyExists is returned when BatchOpCreate is used on an existing key
|
||||
var ErrKeyAlreadyExists = errors.New("key already exists")
|
||||
|
||||
type KV interface {
|
||||
// Keys returns all the keys in the store
|
||||
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
|
||||
@@ -60,6 +87,17 @@ type KV interface {
|
||||
// UnixTimestamp returns the current time in seconds since Epoch.
|
||||
// This is used to ensure the server and client are not too far apart in time.
|
||||
UnixTimestamp(ctx context.Context) (int64, error)
|
||||
|
||||
// Batch executes all operations atomically within a single transaction.
|
||||
// If any operation fails, all operations are rolled back.
|
||||
// Operations are executed in order; the batch stops on first failure.
|
||||
//
|
||||
// Operation semantics:
|
||||
// - BatchOpPut: Upsert (create or update), never fails on key state
|
||||
// - BatchOpCreate: Fail with ErrKeyAlreadyExists if key exists
|
||||
// - BatchOpUpdate: Fail with ErrNotFound if key doesn't exist
|
||||
// - BatchOpDelete: Idempotent, never fails on key state
|
||||
Batch(ctx context.Context, section string, ops []BatchOp) error
|
||||
}
|
||||
|
||||
var _ KV = &badgerKV{}
|
||||
@@ -360,3 +398,69 @@ func (k *badgerKV) BatchDelete(ctx context.Context, section string, keys []strin
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
func (k *badgerKV) Batch(ctx context.Context, section string, ops []BatchOp) error {
|
||||
if k.db.IsClosed() {
|
||||
return fmt.Errorf("database is closed")
|
||||
}
|
||||
|
||||
if section == "" {
|
||||
return fmt.Errorf("section is required")
|
||||
}
|
||||
|
||||
if len(ops) > MaxBatchOps {
|
||||
return fmt.Errorf("too many operations: %d > %d", len(ops), MaxBatchOps)
|
||||
}
|
||||
|
||||
txn := k.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
for _, op := range ops {
|
||||
keyWithSection := section + "/" + op.Key
|
||||
|
||||
switch op.Mode {
|
||||
case BatchOpCreate:
|
||||
// Check that key doesn't exist, then set
|
||||
_, err := txn.Get([]byte(keyWithSection))
|
||||
if err == nil {
|
||||
return ErrKeyAlreadyExists
|
||||
}
|
||||
if !errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return err
|
||||
}
|
||||
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case BatchOpUpdate:
|
||||
// Check that key exists, then set
|
||||
_, err := txn.Get([]byte(keyWithSection))
|
||||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||||
return ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case BatchOpPut:
|
||||
// Upsert: create or update
|
||||
if err := txn.Set([]byte(keyWithSection), op.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case BatchOpDelete:
|
||||
// Idempotent delete - don't error if not found
|
||||
if err := txn.Delete([]byte(keyWithSection)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown operation mode: %d", op.Mode)
|
||||
}
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ const (
|
||||
TestKVUnixTimestamp = "unix timestamp"
|
||||
TestKVBatchGet = "batch get operations"
|
||||
TestKVBatchDelete = "batch delete operations"
|
||||
TestKVBatch = "batch operations"
|
||||
)
|
||||
|
||||
// NewKVFunc is a function that creates a new KV instance for testing
|
||||
@@ -69,6 +70,7 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) {
|
||||
{TestKVUnixTimestamp, runTestKVUnixTimestamp},
|
||||
{TestKVBatchGet, runTestKVBatchGet},
|
||||
{TestKVBatchDelete, runTestKVBatchDelete},
|
||||
{TestKVBatch, runTestKVBatch},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
@@ -801,3 +803,259 @@ func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, ke
|
||||
err = writer.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func runTestKVBatch(t *testing.T, kv resource.KV, nsPrefix string) {
|
||||
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
|
||||
section := nsPrefix + "-batch"
|
||||
|
||||
t.Run("batch with empty section", func(t *testing.T) {
|
||||
err := kv.Batch(ctx, "", nil)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "section is required")
|
||||
})
|
||||
|
||||
t.Run("batch with empty ops succeeds", func(t *testing.T) {
|
||||
err := kv.Batch(ctx, section, nil)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch put creates new key", func(t *testing.T) {
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpPut, Key: "put-key", Value: []byte("put-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the key was created
|
||||
reader, err := kv.Get(ctx, section, "put-key")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "put-value", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch put updates existing key", func(t *testing.T) {
|
||||
// First create a key
|
||||
saveKVHelper(t, kv, ctx, section, "put-update-key", strings.NewReader("original-value"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpPut, Key: "put-update-key", Value: []byte("updated-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the key was updated
|
||||
reader, err := kv.Get(ctx, section, "put-update-key")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "updated-value", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch create succeeds for new key", func(t *testing.T) {
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpCreate, Key: "create-new-key", Value: []byte("new-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the key was created
|
||||
reader, err := kv.Get(ctx, section, "create-new-key")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "new-value", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch create fails for existing key", func(t *testing.T) {
|
||||
// First create a key
|
||||
saveKVHelper(t, kv, ctx, section, "create-exists-key", strings.NewReader("existing-value"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpCreate, Key: "create-exists-key", Value: []byte("new-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
assert.ErrorIs(t, err, resource.ErrKeyAlreadyExists)
|
||||
|
||||
// Verify the original value is unchanged
|
||||
reader, err := kv.Get(ctx, section, "create-exists-key")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "existing-value", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch update succeeds for existing key", func(t *testing.T) {
|
||||
// First create a key
|
||||
saveKVHelper(t, kv, ctx, section, "update-exists-key", strings.NewReader("original-value"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpUpdate, Key: "update-exists-key", Value: []byte("updated-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the key was updated
|
||||
reader, err := kv.Get(ctx, section, "update-exists-key")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "updated-value", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch update fails for non-existent key", func(t *testing.T) {
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpUpdate, Key: "update-nonexistent-key", Value: []byte("new-value")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
|
||||
// Verify the key was not created
|
||||
_, err = kv.Get(ctx, section, "update-nonexistent-key")
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("batch delete removes existing key", func(t *testing.T) {
|
||||
// First create a key
|
||||
saveKVHelper(t, kv, ctx, section, "delete-exists-key", strings.NewReader("to-be-deleted"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpDelete, Key: "delete-exists-key"},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the key was deleted
|
||||
_, err = kv.Get(ctx, section, "delete-exists-key")
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("batch delete is idempotent for non-existent key", func(t *testing.T) {
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpDelete, Key: "delete-nonexistent-key"},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err) // Should succeed even though key doesn't exist
|
||||
})
|
||||
|
||||
t.Run("batch multiple operations atomic success", func(t *testing.T) {
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpPut, Key: "multi-key1", Value: []byte("value1")},
|
||||
{Mode: resource.BatchOpPut, Key: "multi-key2", Value: []byte("value2")},
|
||||
{Mode: resource.BatchOpPut, Key: "multi-key3", Value: []byte("value3")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify all keys were created
|
||||
for i := 1; i <= 3; i++ {
|
||||
key := fmt.Sprintf("multi-key%d", i)
|
||||
reader, err := kv.Get(ctx, section, key)
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, fmt.Sprintf("value%d", i), string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("batch multiple operations atomic rollback on failure", func(t *testing.T) {
|
||||
// First create a key that will cause the batch to fail
|
||||
saveKVHelper(t, kv, ctx, section, "rollback-exists", strings.NewReader("existing"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpPut, Key: "rollback-new1", Value: []byte("value1")},
|
||||
{Mode: resource.BatchOpCreate, Key: "rollback-exists", Value: []byte("should-fail")}, // This will fail
|
||||
{Mode: resource.BatchOpPut, Key: "rollback-new2", Value: []byte("value2")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
assert.ErrorIs(t, err, resource.ErrKeyAlreadyExists)
|
||||
|
||||
// Verify rollback: the first operation should NOT have persisted
|
||||
_, err = kv.Get(ctx, section, "rollback-new1")
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
|
||||
// Verify the third operation was not executed
|
||||
_, err = kv.Get(ctx, section, "rollback-new2")
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
})
|
||||
|
||||
t.Run("batch mixed operations", func(t *testing.T) {
|
||||
// Setup: create a key to update and one to delete
|
||||
saveKVHelper(t, kv, ctx, section, "mixed-update", strings.NewReader("original"))
|
||||
saveKVHelper(t, kv, ctx, section, "mixed-delete", strings.NewReader("to-delete"))
|
||||
|
||||
ops := []resource.BatchOp{
|
||||
{Mode: resource.BatchOpCreate, Key: "mixed-create", Value: []byte("created")},
|
||||
{Mode: resource.BatchOpUpdate, Key: "mixed-update", Value: []byte("updated")},
|
||||
{Mode: resource.BatchOpDelete, Key: "mixed-delete"},
|
||||
{Mode: resource.BatchOpPut, Key: "mixed-put", Value: []byte("put")},
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify create
|
||||
reader, err := kv.Get(ctx, section, "mixed-create")
|
||||
require.NoError(t, err)
|
||||
value, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "created", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify update
|
||||
reader, err = kv.Get(ctx, section, "mixed-update")
|
||||
require.NoError(t, err)
|
||||
value, err = io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "updated", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify delete
|
||||
_, err = kv.Get(ctx, section, "mixed-delete")
|
||||
assert.ErrorIs(t, err, resource.ErrNotFound)
|
||||
|
||||
// Verify put
|
||||
reader, err = kv.Get(ctx, section, "mixed-put")
|
||||
require.NoError(t, err)
|
||||
value, err = io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "put", string(value))
|
||||
err = reader.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("batch too many operations", func(t *testing.T) {
|
||||
ops := make([]resource.BatchOp, resource.MaxBatchOps+1)
|
||||
for i := range ops {
|
||||
ops[i] = resource.BatchOp{Mode: resource.BatchOpPut, Key: fmt.Sprintf("key-%d", i), Value: []byte("value")}
|
||||
}
|
||||
|
||||
err := kv.Batch(ctx, section, ops)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "too many operations")
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user