Compare commits

...

7 Commits

Author SHA1 Message Date
Georges Chaudy
f634f37b5f Enhance SQL KV store with snowflake ID support and resource version handling
- Updated the parsedKey struct to include ResourceVersionSnowflake for storing the original snowflake ID.
- Implemented snowflakeToMicroseconds function to convert snowflake IDs to microsecond timestamps for database storage.
- Modified parseDataKey function to utilize the new microsecond timestamp for ResourceVersion.
- Enhanced SQL queries in upsertResourceVersion to conditionally update resource versions based on the new timestamp logic.

These changes improve the accuracy and precision of resource versioning in the SQL KV store, enhancing data integrity and management.
2025-11-18 12:42:56 +01:00
Georges Chaudy
f5ef25233d Enhance SQL KV store and backend diagnostics
- Updated sqlKV struct to maintain a reference to the dbProvider, preventing garbage collection of the database connection.
- Implemented Ping and checkDB methods in sqlKV for verifying database connection health.
- Enhanced kvStorageBackend to implement the DiagnosticsServer interface, adding IsHealthy method for health checks.
- Updated resource server to utilize the new diagnostics capabilities of kvStorageBackend.

These changes improve the reliability and maintainability of the SQL-based KV store and its integration with the resource server.
2025-11-18 12:12:05 +01:00
Georges Chaudy
e156dfd92a Implement lifecycle management for kvStorageBackend
- Added context and cancel function to kvStorageBackend for lifecycle management.
- Implemented Init and Stop methods to manage background tasks and context cancellation.
- Updated resource server to utilize the kvStorageBackend's lifecycle hooks.

This change enhances the management of background processes within the kvStorageBackend, improving resource handling and cleanup.
2025-11-18 11:40:48 +01:00
Georges Chaudy
fa3d906d41 Add unifiedStorageKVBackend feature toggle
- Introduced the `unifiedStorageKVBackend` feature toggle to enable the use of a KV-backed SQL storage backend instead of direct SQL queries.
- Updated relevant files to include the new feature toggle in the registry, CSV, JSON, and Go definitions.
- Enhanced the resource server logic to conditionally use the KV backend based on the feature flag.

This change expands the storage options available for the application, improving flexibility in data management.
2025-11-18 11:35:17 +01:00
Georges Chaudy
a8c86de2d6 Add SQL-based KV store implementation
- Introduced a new `sqlkv.go` file implementing the KV interface using SQL storage.
- Added methods for creating, retrieving, updating, and deleting key-value pairs in a SQL database.
- Implemented support for multiple SQL dialects (MySQL, PostgreSQL, SQLite).
- Included functionality for batch operations and key parsing.

This change lays the foundation for a robust SQL-based key-value storage solution, enhancing data management capabilities.
2025-11-18 11:27:10 +01:00
Georges Chaudy
761a8f6c31 Enhance resource_history SQL update to include key_path column
- Updated SQL to set key_path based on resource attributes and action type.
- Removed the migration to make key_path NOT NULL, keeping it nullable to accommodate specific write patterns.
- Adjusted comments to clarify the rationale behind key_path's nullable status.

This change improves the handling of resource history updates and maintains flexibility in data management.
2025-11-17 17:39:50 +01:00
Georges Chaudy
3c0fc606ae Add key_path column and backfill for resource_history table
- Introduced key_path column to resource_history for KV interface.
- Implemented backfill logic for key_path using a new migrator.
- Made key_path column NOT NULL after backfill completion.
- Added index on key_path column for improved query performance.
- Created resource_events table to support KV interface.

This change enhances the resource history management and optimizes data retrieval.
2025-11-17 14:57:18 +01:00
10 changed files with 1165 additions and 25 deletions

View File

@@ -912,6 +912,10 @@ export interface FeatureToggles {
*/
unifiedStorageGrpcConnectionPool?: boolean;
/**
* Use KV-backed SQL storage backend instead of direct SQL queries
*/
unifiedStorageKVBackend?: boolean;
/**
* Enables UI functionality to permanently delete alert rules
* @default true
*/

View File

@@ -1580,6 +1580,14 @@ var (
HideFromAdminPage: true,
HideFromDocs: true,
},
{
Name: "unifiedStorageKVBackend",
Description: "Use KV-backed SQL storage backend instead of direct SQL queries",
Stage: FeatureStageExperimental,
Owner: grafanaSearchAndStorageSquad,
HideFromAdminPage: true,
HideFromDocs: true,
},
{
Name: "alertingRulePermanentlyDelete",
Description: "Enables UI functionality to permanently delete alert rules",

View File

@@ -205,6 +205,7 @@ unifiedStorageHistoryPruner,GA,@grafana/search-and-storage,false,false,false
azureMonitorLogsBuilderEditor,preview,@grafana/partner-datasources,false,false,false
localeFormatPreference,preview,@grafana/grafana-frontend-platform,false,false,false
unifiedStorageGrpcConnectionPool,experimental,@grafana/search-and-storage,false,false,false
unifiedStorageKVBackend,experimental,@grafana/search-and-storage,false,false,false
alertingRulePermanentlyDelete,GA,@grafana/alerting-squad,false,false,true
alertingRuleRecoverDeleted,GA,@grafana/alerting-squad,false,false,true
multiTenantTempCredentials,experimental,@grafana/aws-datasources,false,false,false
1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
205 azureMonitorLogsBuilderEditor preview @grafana/partner-datasources false false false
206 localeFormatPreference preview @grafana/grafana-frontend-platform false false false
207 unifiedStorageGrpcConnectionPool experimental @grafana/search-and-storage false false false
208 unifiedStorageKVBackend experimental @grafana/search-and-storage false false false
209 alertingRulePermanentlyDelete GA @grafana/alerting-squad false false true
210 alertingRuleRecoverDeleted GA @grafana/alerting-squad false false true
211 multiTenantTempCredentials experimental @grafana/aws-datasources false false false

View File

@@ -830,6 +830,10 @@ const (
// Enables the unified storage grpc connection pool
FlagUnifiedStorageGrpcConnectionPool = "unifiedStorageGrpcConnectionPool"
// FlagUnifiedStorageKVBackend
// Use KV-backed SQL storage backend instead of direct SQL queries
FlagUnifiedStorageKVBackend = "unifiedStorageKVBackend"
// FlagAlertingRulePermanentlyDelete
// Enables UI functionality to permanently delete alert rules
FlagAlertingRulePermanentlyDelete = "alertingRulePermanentlyDelete"

View File

@@ -4208,6 +4208,20 @@
"expression": "true"
}
},
{
"metadata": {
"name": "unifiedStorageKVBackend",
"resourceVersion": "1763461706359",
"creationTimestamp": "2025-11-18T10:28:26Z"
},
"spec": {
"description": "Use KV-backed SQL storage backend instead of direct SQL queries",
"stage": "experimental",
"codeowner": "@grafana/search-and-storage",
"hideFromAdminPage": true,
"hideFromDocs": true
}
},
{
"metadata": {
"name": "unifiedStorageSearch",

View File

@@ -0,0 +1,835 @@
package resource
import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"io"
"iter"
"strconv"
"strings"
"time"
"github.com/bwmarrin/snowflake"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
const (
sectionData = "unified/data"
sectionEvents = "unified/events"
)
// sqlKV implements the KV interface using SQL storage
type sqlKV struct {
dbProvider db.DBProvider // Keep reference to prevent GC
db db.DB
dialect sqltemplate.Dialect
}
// NewSQLKV creates a new SQL-based KV store
func NewSQLKV(dbProvider db.DBProvider) (KV, error) {
if dbProvider == nil {
return nil, errors.New("dbProvider is required")
}
// Initialize the database connection
ctx := context.Background()
dbConn, err := dbProvider.Init(ctx)
if err != nil {
return nil, fmt.Errorf("initialize DB: %w", err)
}
// Determine the SQL dialect
var dialect sqltemplate.Dialect
switch dbConn.DriverName() {
case "mysql":
dialect = sqltemplate.MySQL
case "postgres":
dialect = sqltemplate.PostgreSQL
case "sqlite3", "sqlite":
dialect = sqltemplate.SQLite
default:
return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
}
return &sqlKV{
dbProvider: dbProvider, // Keep reference to prevent GC from closing the database
db: dbConn,
dialect: dialect,
}, nil
}
// Verify that sqlKV implements KV interface
var _ KV = &sqlKV{}
// Helper function to build identifiers safely
func (k *sqlKV) ident(name string) (string, error) {
return k.dialect.Ident(name)
}
// Helper function to get table name for a section
func (k *sqlKV) getTableName(section string) (string, error) {
switch section {
case sectionData:
return k.ident("resource_history")
case sectionEvents:
return k.ident("resource_events")
default:
return "", fmt.Errorf("unsupported section: %s", section)
}
}
// parsedKey represents the components of a key_path for the data section
// Format: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}
type parsedKey struct {
Group string
Resource string
Namespace string
Name string
ResourceVersion int64 // Microsecond timestamp
ResourceVersionSnowflake int64 // Original snowflake ID from key
Action int // 1: create, 2: update, 3: delete
Folder string
}
// snowflakeToMicroseconds converts a snowflake ID to a unix microsecond timestamp
// Uses the snowflake library's Time() method which handles the Grafana epoch internally
func snowflakeToMicroseconds(snowflakeID int64) int64 {
// Extract unix milliseconds from snowflake (handles Grafana epoch internally)
unixMilliseconds := snowflake.ID(snowflakeID).Time()
// Extract sequence number (low 12 bits) for sub-millisecond precision
sequence := snowflakeID & 0xFFF // 0xFFF = 4095 = 2^12 - 1
// Convert to unix microseconds: (unix_ms * 1000) + sequence
return (unixMilliseconds * 1000) + sequence
}
// parseDataKey parses a data section key_path
func parseDataKey(keyPath string) (*parsedKey, error) {
// Split by ~ to separate main key from action and folder
parts := strings.Split(keyPath, "~")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid key format: expected 3 parts separated by '~', got %d", len(parts))
}
// Split main key by /
mainParts := strings.Split(parts[0], "/")
if len(mainParts) != 5 {
return nil, fmt.Errorf("invalid key format: expected 5 parts separated by '/', got %d", len(mainParts))
}
// Parse resource version (stored as snowflake ID in key)
snowflakeID, err := strconv.ParseInt(mainParts[4], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid resource_version: %w", err)
}
// Convert snowflake ID to microsecond timestamp for database storage
microseconds := snowflakeToMicroseconds(snowflakeID)
// Convert action string to int
var action int
switch parts[1] {
case "created":
action = 1
case "updated":
action = 2
case "deleted":
action = 3
default:
return nil, fmt.Errorf("invalid action: %s", parts[1])
}
return &parsedKey{
Group: mainParts[0],
Resource: mainParts[1],
Namespace: mainParts[2],
Name: mainParts[3],
ResourceVersion: microseconds, // Microsecond timestamp for DB
ResourceVersionSnowflake: snowflakeID, // Original snowflake ID
Action: action,
Folder: parts[2], // May be empty string
}, nil
}
// Get retrieves the value for a key from the store
func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) {
if section == "" {
return nil, fmt.Errorf("section is required")
}
if key == "" {
return nil, fmt.Errorf("key is required")
}
tableName, err := k.getTableName(section)
if err != nil {
return nil, err
}
valueIdent, err := k.ident("value")
if err != nil {
return nil, fmt.Errorf("invalid column identifier: %w", err)
}
keyPathIdent, err := k.ident("key_path")
if err != nil {
return nil, fmt.Errorf("invalid column identifier: %w", err)
}
query := fmt.Sprintf(
"SELECT %s FROM %s WHERE %s = %s",
valueIdent,
tableName,
keyPathIdent,
k.dialect.ArgPlaceholder(1),
)
// Execute the query
var value []byte
err = k.db.QueryRowContext(ctx, query, key).Scan(&value)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("query failed: %w", err)
}
return io.NopCloser(bytes.NewReader(value)), nil
}
// BatchGet retrieves multiple values for the given keys from the store
// Uses a UNION ALL subquery with LEFT JOIN to preserve order and enable streaming
func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] {
if section == "" {
return func(yield func(KeyValue, error) bool) {
yield(KeyValue{}, fmt.Errorf("section is required"))
}
}
if len(keys) == 0 {
return func(yield func(KeyValue, error) bool) {
// Empty result set - nothing to yield
}
}
return func(yield func(KeyValue, error) bool) {
tableName, err := k.getTableName(section)
if err != nil {
yield(KeyValue{}, err)
return
}
keyPathIdent, err := k.ident("key_path")
if err != nil {
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
return
}
valueIdent, err := k.ident("value")
if err != nil {
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
return
}
// Build UNION ALL subquery to preserve key order
// SELECT 0 AS idx, ? AS kp UNION ALL SELECT 1, ? UNION ALL ...
var unionParts []string
var args []interface{}
argNum := 1
for i, key := range keys {
if i == 0 {
unionParts = append(unionParts, fmt.Sprintf(
"SELECT %s AS idx, %s AS kp",
k.dialect.ArgPlaceholder(argNum),
k.dialect.ArgPlaceholder(argNum+1),
))
} else {
unionParts = append(unionParts, fmt.Sprintf(
"UNION ALL SELECT %s, %s",
k.dialect.ArgPlaceholder(argNum),
k.dialect.ArgPlaceholder(argNum+1),
))
}
args = append(args, i, key)
argNum += 2
}
// Build the full query with LEFT JOIN to preserve order
// This allows streaming results directly without buffering
query := fmt.Sprintf(
"SELECT v.idx, t.%s, t.%s FROM (%s) AS v LEFT JOIN %s t ON t.%s = v.kp ORDER BY v.idx",
keyPathIdent,
valueIdent,
strings.Join(unionParts, " "),
tableName,
keyPathIdent,
)
// Execute the query
rows, err := k.db.QueryContext(ctx, query, args...)
if err != nil {
yield(KeyValue{}, fmt.Errorf("query failed: %w", err))
return
}
defer rows.Close()
// Stream results directly - no buffering needed!
// Results come back in the order specified by idx
for rows.Next() {
var idx int
var keyPath sql.NullString
var value []byte
if err := rows.Scan(&idx, &keyPath, &value); err != nil {
yield(KeyValue{}, fmt.Errorf("scan failed: %w", err))
return
}
// Skip keys that don't exist (LEFT JOIN returns NULL)
if !keyPath.Valid {
continue
}
kv := KeyValue{
Key: keyPath.String,
Value: io.NopCloser(bytes.NewReader(value)),
}
if !yield(kv, nil) {
return
}
}
if err := rows.Err(); err != nil {
yield(KeyValue{}, fmt.Errorf("rows error: %w", err))
return
}
}
}
// Keys returns all the keys in the store
func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] {
if section == "" {
return func(yield func(string, error) bool) {
yield("", fmt.Errorf("section is required"))
}
}
return func(yield func(string, error) bool) {
tableName, err := k.getTableName(section)
if err != nil {
yield("", err)
return
}
keyPathIdent, err := k.ident("key_path")
if err != nil {
yield("", fmt.Errorf("invalid column identifier: %w", err))
return
}
// Build WHERE clauses
var whereClauses []string
var args []interface{}
argNum := 1
// Start key (inclusive)
if opt.StartKey != "" {
whereClauses = append(whereClauses, fmt.Sprintf("%s >= %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
args = append(args, opt.StartKey)
argNum++
}
// End key (exclusive)
if opt.EndKey != "" {
whereClauses = append(whereClauses, fmt.Sprintf("%s < %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
args = append(args, opt.EndKey)
argNum++
}
// Build ORDER BY clause
orderBy := "ASC"
if opt.Sort == SortOrderDesc {
orderBy = "DESC"
}
// Build the query
query := fmt.Sprintf(
"SELECT %s FROM %s",
keyPathIdent,
tableName,
)
if len(whereClauses) > 0 {
query += " WHERE " + strings.Join(whereClauses, " AND ")
}
query += fmt.Sprintf(" ORDER BY %s %s", keyPathIdent, orderBy)
if opt.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", opt.Limit)
}
// Execute the query
rows, err := k.db.QueryContext(ctx, query, args...)
if err != nil {
yield("", fmt.Errorf("query failed: %w", err))
return
}
defer rows.Close()
// Yield each key
for rows.Next() {
var keyPath string
if err := rows.Scan(&keyPath); err != nil {
yield("", fmt.Errorf("scan failed: %w", err))
return
}
if !yield(keyPath, nil) {
return
}
}
if err := rows.Err(); err != nil {
yield("", fmt.Errorf("rows error: %w", err))
return
}
}
}
// Save a new value - returns a WriteCloser to write the value to
func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) {
if section == "" {
return nil, fmt.Errorf("section is required")
}
if key == "" {
return nil, fmt.Errorf("key is required")
}
return &sqlWriteCloser{
kv: k,
ctx: ctx,
section: section,
key: key,
buf: &bytes.Buffer{},
closed: false,
}, nil
}
// sqlWriteCloser implements io.WriteCloser for SQL KV Save operations
type sqlWriteCloser struct {
kv *sqlKV
ctx context.Context
section string
key string
buf *bytes.Buffer
closed bool
}
// Write implements io.Writer
func (w *sqlWriteCloser) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("write to closed writer")
}
return w.buf.Write(p)
}
// Close implements io.Closer - stores the buffered data in SQL
func (w *sqlWriteCloser) Close() error {
if w.closed {
return nil
}
w.closed = true
value := w.buf.Bytes()
switch w.section {
case sectionEvents:
// Simple upsert for events section
return w.closeEvents(value)
case sectionData:
// Complex multi-table transaction for data section
return w.closeData(value)
default:
return fmt.Errorf("unsupported section: %s", w.section)
}
}
// closeEvents handles the simple upsert for the events section
func (w *sqlWriteCloser) closeEvents(value []byte) error {
tableName, err := w.kv.getTableName(w.section)
if err != nil {
return err
}
keyPathIdent, err := w.kv.ident("key_path")
if err != nil {
return fmt.Errorf("invalid column identifier: %w", err)
}
valueIdent, err := w.kv.ident("value")
if err != nil {
return fmt.Errorf("invalid column identifier: %w", err)
}
ph1 := w.kv.dialect.ArgPlaceholder(1)
ph2 := w.kv.dialect.ArgPlaceholder(2)
var query string
switch w.kv.dialect.DialectName() {
case "postgres":
query = fmt.Sprintf(
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s",
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
)
case "mysql":
query = fmt.Sprintf(
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON DUPLICATE KEY UPDATE %s = VALUES(%s)",
tableName, keyPathIdent, valueIdent, ph1, ph2, valueIdent, valueIdent,
)
case "sqlite":
query = fmt.Sprintf(
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s",
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
)
default:
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
}
_, err = w.kv.db.ExecContext(w.ctx, query, w.key, value)
if err != nil {
return fmt.Errorf("insert/update failed: %w", err)
}
return nil
}
// closeData handles the complex multi-table transaction for the data section
func (w *sqlWriteCloser) closeData(value []byte) error {
// Parse the key to extract all fields
parsed, err := parseDataKey(w.key)
if err != nil {
return fmt.Errorf("parse key: %w", err)
}
// Generate a GUID for this write
guid := uuid.New().String()
// Execute all operations in a transaction
return w.kv.db.WithTx(w.ctx, nil, func(ctx context.Context, tx db.Tx) error {
// 1. Insert/update resource_history
if err := w.upsertResourceHistory(ctx, tx, parsed, guid, value); err != nil {
return fmt.Errorf("upsert resource_history: %w", err)
}
// 2. Handle resource table based on action
if parsed.Action == 3 { // deleted
if err := w.deleteResource(ctx, tx, parsed); err != nil {
return fmt.Errorf("delete resource: %w", err)
}
} else { // created or updated
if err := w.upsertResource(ctx, tx, parsed, guid, value); err != nil {
return fmt.Errorf("upsert resource: %w", err)
}
}
// 3. Upsert resource_version table
if err := w.upsertResourceVersion(ctx, tx, parsed); err != nil {
return fmt.Errorf("upsert resource_version: %w", err)
}
return nil
})
}
// upsertResourceHistory inserts/updates a row in the resource_history table
func (w *sqlWriteCloser) upsertResourceHistory(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
// Build identifiers
tableIdent, _ := w.kv.ident("resource_history")
guidIdent, _ := w.kv.ident("guid")
groupIdent, _ := w.kv.ident("group")
resourceIdent, _ := w.kv.ident("resource")
namespaceIdent, _ := w.kv.ident("namespace")
nameIdent, _ := w.kv.ident("name")
rvIdent, _ := w.kv.ident("resource_version")
prevRVIdent, _ := w.kv.ident("previous_resource_version")
valueIdent, _ := w.kv.ident("value")
actionIdent, _ := w.kv.ident("action")
folderIdent, _ := w.kv.ident("folder")
keyPathIdent, _ := w.kv.ident("key_path")
// Build placeholders
var query string
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
switch w.kv.dialect.DialectName() {
case "postgres":
query = fmt.Sprintf(`
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)
ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
guidIdent, valueIdent, valueIdent, keyPathIdent, keyPathIdent,
)
case "mysql", "sqlite":
// For MySQL and SQLite, use INSERT OR REPLACE (requires all columns)
query = fmt.Sprintf(`
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)`,
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
)
default:
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
}
_, err := tx.ExecContext(ctx, query,
guid, w.key, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
parsed.ResourceVersion, value, parsed.Action, parsed.Folder,
)
return err
}
// upsertResource inserts/updates a row in the resource table
func (w *sqlWriteCloser) upsertResource(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
// Build identifiers
tableIdent, _ := w.kv.ident("resource")
guidIdent, _ := w.kv.ident("guid")
groupIdent, _ := w.kv.ident("group")
resourceIdent, _ := w.kv.ident("resource")
namespaceIdent, _ := w.kv.ident("namespace")
nameIdent, _ := w.kv.ident("name")
rvIdent, _ := w.kv.ident("resource_version")
valueIdent, _ := w.kv.ident("value")
actionIdent, _ := w.kv.ident("action")
var query string
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
switch w.kv.dialect.DialectName() {
case "postgres":
query = fmt.Sprintf(`
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (%s, %s, %s, %s) DO UPDATE SET
%s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
namespaceIdent, groupIdent, resourceIdent, nameIdent,
guidIdent, guidIdent, rvIdent, rvIdent, valueIdent, valueIdent, actionIdent, actionIdent,
)
case "mysql", "sqlite":
query = fmt.Sprintf(`
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)`,
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
)
default:
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
}
_, err := tx.ExecContext(ctx, query,
guid, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
parsed.ResourceVersion, value, parsed.Action,
)
return err
}
// deleteResource deletes a row from the resource table
func (w *sqlWriteCloser) deleteResource(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
tableIdent, _ := w.kv.ident("resource")
groupIdent, _ := w.kv.ident("group")
resourceIdent, _ := w.kv.ident("resource")
namespaceIdent, _ := w.kv.ident("namespace")
nameIdent, _ := w.kv.ident("name")
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
query := fmt.Sprintf(`
DELETE FROM %s WHERE %s = %s AND %s = %s AND %s = %s AND %s = %s`,
tableIdent, groupIdent, ph(1), resourceIdent, ph(2), namespaceIdent, ph(3), nameIdent, ph(4),
)
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name)
return err
}
// upsertResourceVersion inserts/updates the resource_version table
func (w *sqlWriteCloser) upsertResourceVersion(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
tableIdent, _ := w.kv.ident("resource_version")
groupIdent, _ := w.kv.ident("group")
resourceIdent, _ := w.kv.ident("resource")
rvIdent, _ := w.kv.ident("resource_version")
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
var query string
switch w.kv.dialect.DialectName() {
case "postgres":
// Only update if new resource_version is greater than existing
query = fmt.Sprintf(`
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
WHERE EXCLUDED.%s > %s.%s`,
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
groupIdent, resourceIdent, rvIdent, rvIdent,
rvIdent, tableIdent, rvIdent,
)
case "sqlite":
// SQLite supports WHERE clause in ON CONFLICT DO UPDATE
query = fmt.Sprintf(`
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
WHERE EXCLUDED.%s > %s.%s`,
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
groupIdent, resourceIdent, rvIdent, rvIdent,
rvIdent, tableIdent, rvIdent,
)
case "mysql":
// MySQL uses ON DUPLICATE KEY UPDATE with conditional
query = fmt.Sprintf(`
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE %s = IF(VALUES(%s) > %s, VALUES(%s), %s)`,
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
rvIdent, rvIdent, rvIdent, rvIdent, rvIdent,
)
default:
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
}
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.ResourceVersion)
return err
}
// Delete a value
func (k *sqlKV) Delete(ctx context.Context, section string, key string) error {
if section == "" {
return fmt.Errorf("section is required")
}
if key == "" {
return fmt.Errorf("key is required")
}
tableName, err := k.getTableName(section)
if err != nil {
return err
}
keyPathIdent, err := k.ident("key_path")
if err != nil {
return fmt.Errorf("invalid column identifier: %w", err)
}
// First check if key exists (to return ErrNotFound if missing)
checkQuery := fmt.Sprintf(
"SELECT 1 FROM %s WHERE %s = %s",
tableName,
keyPathIdent,
k.dialect.ArgPlaceholder(1),
)
var exists int
err = k.db.QueryRowContext(ctx, checkQuery, key).Scan(&exists)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFound
}
return fmt.Errorf("check existence failed: %w", err)
}
// Delete the key
deleteQuery := fmt.Sprintf(
"DELETE FROM %s WHERE %s = %s",
tableName,
keyPathIdent,
k.dialect.ArgPlaceholder(1),
)
_, err = k.db.ExecContext(ctx, deleteQuery, key)
if err != nil {
return fmt.Errorf("delete failed: %w", err)
}
return nil
}
// BatchDelete removes multiple keys from the store
func (k *sqlKV) BatchDelete(ctx context.Context, section string, keys []string) error {
if section == "" {
return fmt.Errorf("section is required")
}
if len(keys) == 0 {
return nil // Nothing to delete
}
tableName, err := k.getTableName(section)
if err != nil {
return err
}
keyPathIdent, err := k.ident("key_path")
if err != nil {
return fmt.Errorf("invalid column identifier: %w", err)
}
// Build IN clause placeholders
placeholders := make([]string, len(keys))
args := make([]interface{}, len(keys))
for i, key := range keys {
placeholders[i] = k.dialect.ArgPlaceholder(i + 1)
args[i] = key
}
// Build the query
query := fmt.Sprintf(
"DELETE FROM %s WHERE %s IN (%s)",
tableName,
keyPathIdent,
strings.Join(placeholders, ", "),
)
// Execute the query (idempotent - non-existent keys are silently ignored)
_, err = k.db.ExecContext(ctx, query, args...)
if err != nil {
return fmt.Errorf("batch delete failed: %w", err)
}
return nil
}
// UnixTimestamp returns the current time in seconds since Epoch
func (k *sqlKV) UnixTimestamp(ctx context.Context) (int64, error) {
return time.Now().Unix(), nil
}
// Ping checks if the database connection is alive
func (k *sqlKV) Ping(ctx context.Context) error {
if k.db == nil {
return fmt.Errorf("database connection is nil")
}
return k.db.PingContext(ctx)
}
// checkDB verifies the database connection is still valid before operations
func (k *sqlKV) checkDB() error {
if k.db == nil {
return fmt.Errorf("database connection is nil")
}
// Quick ping to verify connection is alive
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := k.db.PingContext(ctx); err != nil {
return fmt.Errorf("database connection is not healthy: %w", err)
}
return nil
}

View File

@@ -66,9 +66,15 @@ type kvStorageBackend struct {
withExperimentalClusterScope bool
//tracer trace.Tracer
//reg prometheus.Registerer
// lifecycle management
ctx context.Context
cancel context.CancelFunc
}
var _ StorageBackend = &kvStorageBackend{}
var _ LifecycleHooks = &kvStorageBackend{}
var _ resourcepb.DiagnosticsServer = &kvStorageBackend{}
type KVBackendOptions struct {
KvStore KV
@@ -81,7 +87,6 @@ type KVBackendOptions struct {
}
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
ctx := context.Background()
kv := opts.KvStore
s, err := snowflake.NewNode(rand.Int64N(1024))
@@ -100,6 +105,9 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
eventPruningInterval = defaultEventPruningInterval
}
// Create a cancellable context for lifecycle management
ctx, cancel := context.WithCancel(context.Background())
backend := &kvStorageBackend{
kv: kv,
dataStore: newDataStore(kv),
@@ -111,18 +119,58 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval,
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
ctx: ctx,
cancel: cancel,
}
err = backend.initPruner(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize pruner: %w", err)
}
// Start the event cleanup background job
go backend.runCleanupOldEvents(ctx)
return backend, nil
}
// Init implements LifecycleHooks
func (k *kvStorageBackend) Init(ctx context.Context) error {
// Initialize the pruner
if err := k.initPruner(ctx); err != nil {
return fmt.Errorf("failed to initialize pruner: %w", err)
}
// Start the event cleanup background job using the backend's lifecycle context
go k.runCleanupOldEvents(k.ctx)
// Start the pruner if it was configured
if k.historyPruner != nil {
k.historyPruner.Start(k.ctx)
}
return nil
}
// Stop implements LifecycleHooks
func (k *kvStorageBackend) Stop(ctx context.Context) error {
// Cancel the context to stop background goroutines
// This will stop both runCleanupOldEvents and the pruner (via debouncer)
k.cancel()
return nil
}
// IsHealthy implements DiagnosticsServer
func (k *kvStorageBackend) IsHealthy(ctx context.Context, _ *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
// Check if the underlying KV store supports Ping (e.g., sqlKV)
type pinger interface {
Ping(context.Context) error
}
if p, ok := k.kv.(pinger); ok {
if err := p.Ping(ctx); err != nil {
return nil, fmt.Errorf("KV store health check failed: %w", err)
}
}
return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_SERVING}, nil
}
// Read implements DiagnosticsServer
func (k *kvStorageBackend) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
return nil, ErrNotImplementedYet
}
// runCleanupOldEvents starts a background goroutine that periodically cleans up old events
func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) {
// Run cleanup every hour
@@ -217,7 +265,6 @@ func (k *kvStorageBackend) initPruner(ctx context.Context) error {
}
k.historyPruner = pruner
k.historyPruner.Start(ctx)
return nil
}

View File

@@ -1,11 +1,45 @@
UPDATE {{ .Ident "resource_history" }}
SET {{ .Ident "resource_version" }} = (
SET
{{ .Ident "resource_version" }} = (
CASE
{{ range $guid, $rv := .GUIDToRV }}
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
{{ end }}
END
),
{{ .Ident "key_path" }} = {{ if eq .DialectName "sqlite" -}}
{{ .Ident "group" }} || CHAR(47) || {{ .Ident "resource" }} || CHAR(47) || {{ .Ident "namespace" }} || CHAR(47) || {{ .Ident "name" }} || CHAR(47) ||
CAST((CASE
{{- range $guid, $rv := .GUIDToRV }}
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} / 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} % 1000)
{{- end }}
END) AS TEXT) || CHAR(126) ||
CASE {{ .Ident "action" }}
WHEN 1 THEN 'created'
WHEN 2 THEN 'updated'
WHEN 3 THEN 'deleted'
ELSE 'unknown'
END || CHAR(126) || COALESCE({{ .Ident "folder" }}, '')
{{- else -}}
CONCAT(
{{ .Ident "group" }}, CHAR(47),
{{ .Ident "resource" }}, CHAR(47),
{{ .Ident "namespace" }}, CHAR(47),
{{ .Ident "name" }}, CHAR(47),
CAST((CASE
{{- range $guid, $rv := .GUIDToRV }}
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} DIV 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} MOD 1000)
{{- end }}
END) AS {{ if eq .DialectName "postgres" }}TEXT{{ else }}CHAR{{ end }}), CHAR(126),
CASE {{ .Ident "action" }}
WHEN 1 THEN 'created'
WHEN 2 THEN 'updated'
WHEN 3 THEN 'deleted'
ELSE 'unknown'
END, CHAR(126),
COALESCE({{ .Ident "folder" }}, '')
)
{{- end }}
WHERE {{ .Ident "guid" }} IN (
{{$first := true}}
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}

View File

@@ -3,7 +3,9 @@ package migrations
import (
"fmt"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/util/xorm"
)
func initResourceTables(mg *migrator.Migrator) string {
@@ -185,5 +187,170 @@ func initResourceTables(mg *migrator.Migrator) string {
Name: "UQE_resource_last_import_time_last_import_time",
}))
// TODO: Do we want the value to be MEDIUMTEXT ?
// TODO: What's the best name for the key_path column?
// Add key_path column to resource_history for KV interface
mg.AddMigration("Add key_path column to resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{
Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: true,
}))
// Backfill key_path column in resource_history
mg.AddMigration("Backfill key_path column in resource_history", &resourceHistoryKeyBackfillMigrator{})
// Note: key_path remains nullable because the write pattern is:
// 1. INSERT (key_path = NULL)
// 2. UPDATE (key_path = actual value after RV allocation)
// Add index on key_path column
mg.AddMigration("Add index on key_path column in resource_history", migrator.NewAddIndexMigration(resource_history_table, &migrator.Index{
Name: "IDX_resource_history_key_path",
Cols: []string{"key_path"},
Type: migrator.IndexType,
}))
// Create resource_events table for KV interface
resource_events_table := migrator.Table{
Name: "resource_events",
Columns: []*migrator.Column{
{Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: false, IsPrimaryKey: true},
{Name: "value", Type: migrator.DB_MediumText, Nullable: false},
},
}
mg.AddMigration("create table "+resource_events_table.Name, migrator.NewAddTableMigration(resource_events_table))
return marker
}
// resourceHistoryKeyBackfillMigrator backfills the key_path column in resource_history table
// It processes rows in batches to reduce lock duration and avoid timeouts on large tables
type resourceHistoryKeyBackfillMigrator struct {
migrator.MigrationBase
}
func (m *resourceHistoryKeyBackfillMigrator) SQL(dialect migrator.Dialect) string {
return "Backfill key_path column in resource_history using pattern: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}"
}
func (m *resourceHistoryKeyBackfillMigrator) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
dialect := mg.Dialect.DriverName()
logger := log.New("resource-history-key-backfill")
// TODO: Verify the RV to Snowflake ID conversion is correct.
// Snowflake ID epoch in milliseconds (2010-11-04T01:42:54.657Z)
const epochMs = 1288834974657
const batchSize = 1000 // Process 1000 rows at a time
// Count total rows to backfill
totalCount, err := sess.Table("resource_history").Where("key_path IS NULL").Count()
if err != nil {
return fmt.Errorf("failed to count rows: %w", err)
}
if totalCount == 0 {
logger.Info("No rows to backfill")
return nil
}
logger.Info("Starting key_path backfill", "total_rows", totalCount)
// Build the SQL query based on the database dialect
var updateSQL string
switch dialect {
case "mysql":
updateSQL = `
UPDATE resource_history
SET key_path = CONCAT(
` + "`group`" + `, '/',
` + "`resource`" + `, '/',
` + "`namespace`" + `, '/',
` + "`name`" + `, '/',
CAST((((` + "`resource_version`" + ` DIV 1000) - ?) * 4194304) + (` + "`resource_version`" + ` MOD 1000) AS CHAR), '~',
CASE ` + "`action`" + `
WHEN 1 THEN 'created'
WHEN 2 THEN 'updated'
WHEN 3 THEN 'deleted'
ELSE 'unknown'
END, '~',
COALESCE(` + "`folder`" + `, '')
)
WHERE key_path IS NULL
LIMIT ?
`
case "postgres":
updateSQL = `
UPDATE resource_history
SET key_path = CONCAT(
"group", '/',
"resource", '/',
"namespace", '/',
"name", '/',
CAST((((resource_version / 1000) - $1) * 4194304) + (resource_version % 1000) AS BIGINT), '~',
CASE "action"
WHEN 1 THEN 'created'
WHEN 2 THEN 'updated'
WHEN 3 THEN 'deleted'
ELSE 'unknown'
END, '~',
COALESCE("folder", '')
)
WHERE guid IN (
SELECT guid FROM resource_history
WHERE key_path IS NULL
LIMIT $2
)
`
case "sqlite3":
updateSQL = `
UPDATE resource_history
SET key_path =
"group" || '/' ||
resource || '/' ||
namespace || '/' ||
name || '/' ||
CAST((((resource_version / 1000) - ?) * 4194304) + (resource_version % 1000) AS TEXT) || '~' ||
CASE action
WHEN 1 THEN 'created'
WHEN 2 THEN 'updated'
WHEN 3 THEN 'deleted'
ELSE 'unknown'
END || '~' ||
COALESCE(folder, '')
WHERE guid IN (
SELECT guid FROM resource_history
WHERE key_path IS NULL
LIMIT ?
)
`
default:
return fmt.Errorf("unsupported database dialect: %s", dialect)
}
// Process in batches
processed := int64(0)
for {
result, err := sess.Exec(updateSQL, epochMs, batchSize)
if err != nil {
return fmt.Errorf("failed to update batch: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
processed += rowsAffected
logger.Info("Backfill progress", "processed", processed, "total", totalCount,
"percent", fmt.Sprintf("%.1f%%", float64(processed)/float64(totalCount)*100))
// If we updated fewer rows than batch size, we're done
if rowsAffected < int64(batchSize) {
break
}
}
logger.Info("Backfill completed", "total_processed", processed)
return nil
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
@@ -101,6 +102,30 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
//nolint:staticcheck // not yet migrated to OpenFeature
withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
// Check if KV backend is enabled via feature flag
//nolint:staticcheck // not yet migrated to OpenFeature
if opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageKVBackend) {
// Create SQL KV instance
sqlKV, err := resource.NewSQLKV(eDB)
if err != nil {
return nil, fmt.Errorf("create SQL KV: %w", err)
}
// Use existing KV storage backend (already implements StorageBackend interface)
kvBackend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
KvStore: sqlKV,
WithPruner: withPruner,
Tracer: opts.Tracer,
Reg: opts.Reg,
})
if err != nil {
return nil, fmt.Errorf("create KV backend: %w", err)
}
serverOptions.Backend = kvBackend
serverOptions.Lifecycle = kvBackend.(resource.LifecycleHooks)
serverOptions.Diagnostics = kvBackend.(resourcepb.DiagnosticsServer)
} else {
// Use existing SQL backend
backend, err := NewBackend(BackendOptions{
DBProvider: eDB,
Tracer: opts.Tracer,
@@ -117,6 +142,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Diagnostics = backend
serverOptions.Lifecycle = backend
}
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics