mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 03:54:29 +08:00
Compare commits
7 Commits
zoltan/pos
...
sql-kvstor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f634f37b5f | ||
|
|
f5ef25233d | ||
|
|
e156dfd92a | ||
|
|
fa3d906d41 | ||
|
|
a8c86de2d6 | ||
|
|
761a8f6c31 | ||
|
|
3c0fc606ae |
@@ -912,6 +912,10 @@ export interface FeatureToggles {
|
||||
*/
|
||||
unifiedStorageGrpcConnectionPool?: boolean;
|
||||
/**
|
||||
* Use KV-backed SQL storage backend instead of direct SQL queries
|
||||
*/
|
||||
unifiedStorageKVBackend?: boolean;
|
||||
/**
|
||||
* Enables UI functionality to permanently delete alert rules
|
||||
* @default true
|
||||
*/
|
||||
|
||||
@@ -1580,6 +1580,14 @@ var (
|
||||
HideFromAdminPage: true,
|
||||
HideFromDocs: true,
|
||||
},
|
||||
{
|
||||
Name: "unifiedStorageKVBackend",
|
||||
Description: "Use KV-backed SQL storage backend instead of direct SQL queries",
|
||||
Stage: FeatureStageExperimental,
|
||||
Owner: grafanaSearchAndStorageSquad,
|
||||
HideFromAdminPage: true,
|
||||
HideFromDocs: true,
|
||||
},
|
||||
{
|
||||
Name: "alertingRulePermanentlyDelete",
|
||||
Description: "Enables UI functionality to permanently delete alert rules",
|
||||
|
||||
1
pkg/services/featuremgmt/toggles_gen.csv
generated
1
pkg/services/featuremgmt/toggles_gen.csv
generated
@@ -205,6 +205,7 @@ unifiedStorageHistoryPruner,GA,@grafana/search-and-storage,false,false,false
|
||||
azureMonitorLogsBuilderEditor,preview,@grafana/partner-datasources,false,false,false
|
||||
localeFormatPreference,preview,@grafana/grafana-frontend-platform,false,false,false
|
||||
unifiedStorageGrpcConnectionPool,experimental,@grafana/search-and-storage,false,false,false
|
||||
unifiedStorageKVBackend,experimental,@grafana/search-and-storage,false,false,false
|
||||
alertingRulePermanentlyDelete,GA,@grafana/alerting-squad,false,false,true
|
||||
alertingRuleRecoverDeleted,GA,@grafana/alerting-squad,false,false,true
|
||||
multiTenantTempCredentials,experimental,@grafana/aws-datasources,false,false,false
|
||||
|
||||
|
4
pkg/services/featuremgmt/toggles_gen.go
generated
4
pkg/services/featuremgmt/toggles_gen.go
generated
@@ -830,6 +830,10 @@ const (
|
||||
// Enables the unified storage grpc connection pool
|
||||
FlagUnifiedStorageGrpcConnectionPool = "unifiedStorageGrpcConnectionPool"
|
||||
|
||||
// FlagUnifiedStorageKVBackend
|
||||
// Use KV-backed SQL storage backend instead of direct SQL queries
|
||||
FlagUnifiedStorageKVBackend = "unifiedStorageKVBackend"
|
||||
|
||||
// FlagAlertingRulePermanentlyDelete
|
||||
// Enables UI functionality to permanently delete alert rules
|
||||
FlagAlertingRulePermanentlyDelete = "alertingRulePermanentlyDelete"
|
||||
|
||||
14
pkg/services/featuremgmt/toggles_gen.json
generated
14
pkg/services/featuremgmt/toggles_gen.json
generated
@@ -4208,6 +4208,20 @@
|
||||
"expression": "true"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "unifiedStorageKVBackend",
|
||||
"resourceVersion": "1763461706359",
|
||||
"creationTimestamp": "2025-11-18T10:28:26Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Use KV-backed SQL storage backend instead of direct SQL queries",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/search-and-storage",
|
||||
"hideFromAdminPage": true,
|
||||
"hideFromDocs": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "unifiedStorageSearch",
|
||||
|
||||
835
pkg/storage/unified/resource/sqlkv.go
Normal file
835
pkg/storage/unified/resource/sqlkv.go
Normal file
@@ -0,0 +1,835 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"iter"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/google/uuid"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
|
||||
)
|
||||
|
||||
const (
|
||||
sectionData = "unified/data"
|
||||
sectionEvents = "unified/events"
|
||||
)
|
||||
|
||||
// sqlKV implements the KV interface using SQL storage
|
||||
type sqlKV struct {
|
||||
dbProvider db.DBProvider // Keep reference to prevent GC
|
||||
db db.DB
|
||||
dialect sqltemplate.Dialect
|
||||
}
|
||||
|
||||
// NewSQLKV creates a new SQL-based KV store
|
||||
func NewSQLKV(dbProvider db.DBProvider) (KV, error) {
|
||||
if dbProvider == nil {
|
||||
return nil, errors.New("dbProvider is required")
|
||||
}
|
||||
|
||||
// Initialize the database connection
|
||||
ctx := context.Background()
|
||||
dbConn, err := dbProvider.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize DB: %w", err)
|
||||
}
|
||||
|
||||
// Determine the SQL dialect
|
||||
var dialect sqltemplate.Dialect
|
||||
switch dbConn.DriverName() {
|
||||
case "mysql":
|
||||
dialect = sqltemplate.MySQL
|
||||
case "postgres":
|
||||
dialect = sqltemplate.PostgreSQL
|
||||
case "sqlite3", "sqlite":
|
||||
dialect = sqltemplate.SQLite
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
|
||||
}
|
||||
|
||||
return &sqlKV{
|
||||
dbProvider: dbProvider, // Keep reference to prevent GC from closing the database
|
||||
db: dbConn,
|
||||
dialect: dialect,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Verify that sqlKV implements KV interface
|
||||
var _ KV = &sqlKV{}
|
||||
|
||||
// Helper function to build identifiers safely
|
||||
func (k *sqlKV) ident(name string) (string, error) {
|
||||
return k.dialect.Ident(name)
|
||||
}
|
||||
|
||||
// Helper function to get table name for a section
|
||||
func (k *sqlKV) getTableName(section string) (string, error) {
|
||||
switch section {
|
||||
case sectionData:
|
||||
return k.ident("resource_history")
|
||||
case sectionEvents:
|
||||
return k.ident("resource_events")
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported section: %s", section)
|
||||
}
|
||||
}
|
||||
|
||||
// parsedKey represents the components of a key_path for the data section
|
||||
// Format: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}
|
||||
type parsedKey struct {
|
||||
Group string
|
||||
Resource string
|
||||
Namespace string
|
||||
Name string
|
||||
ResourceVersion int64 // Microsecond timestamp
|
||||
ResourceVersionSnowflake int64 // Original snowflake ID from key
|
||||
Action int // 1: create, 2: update, 3: delete
|
||||
Folder string
|
||||
}
|
||||
|
||||
// snowflakeToMicroseconds converts a snowflake ID to a unix microsecond timestamp
|
||||
// Uses the snowflake library's Time() method which handles the Grafana epoch internally
|
||||
func snowflakeToMicroseconds(snowflakeID int64) int64 {
|
||||
// Extract unix milliseconds from snowflake (handles Grafana epoch internally)
|
||||
unixMilliseconds := snowflake.ID(snowflakeID).Time()
|
||||
|
||||
// Extract sequence number (low 12 bits) for sub-millisecond precision
|
||||
sequence := snowflakeID & 0xFFF // 0xFFF = 4095 = 2^12 - 1
|
||||
|
||||
// Convert to unix microseconds: (unix_ms * 1000) + sequence
|
||||
return (unixMilliseconds * 1000) + sequence
|
||||
}
|
||||
|
||||
// parseDataKey parses a data section key_path
|
||||
func parseDataKey(keyPath string) (*parsedKey, error) {
|
||||
// Split by ~ to separate main key from action and folder
|
||||
parts := strings.Split(keyPath, "~")
|
||||
if len(parts) != 3 {
|
||||
return nil, fmt.Errorf("invalid key format: expected 3 parts separated by '~', got %d", len(parts))
|
||||
}
|
||||
|
||||
// Split main key by /
|
||||
mainParts := strings.Split(parts[0], "/")
|
||||
if len(mainParts) != 5 {
|
||||
return nil, fmt.Errorf("invalid key format: expected 5 parts separated by '/', got %d", len(mainParts))
|
||||
}
|
||||
|
||||
// Parse resource version (stored as snowflake ID in key)
|
||||
snowflakeID, err := strconv.ParseInt(mainParts[4], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid resource_version: %w", err)
|
||||
}
|
||||
|
||||
// Convert snowflake ID to microsecond timestamp for database storage
|
||||
microseconds := snowflakeToMicroseconds(snowflakeID)
|
||||
|
||||
// Convert action string to int
|
||||
var action int
|
||||
switch parts[1] {
|
||||
case "created":
|
||||
action = 1
|
||||
case "updated":
|
||||
action = 2
|
||||
case "deleted":
|
||||
action = 3
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid action: %s", parts[1])
|
||||
}
|
||||
|
||||
return &parsedKey{
|
||||
Group: mainParts[0],
|
||||
Resource: mainParts[1],
|
||||
Namespace: mainParts[2],
|
||||
Name: mainParts[3],
|
||||
ResourceVersion: microseconds, // Microsecond timestamp for DB
|
||||
ResourceVersionSnowflake: snowflakeID, // Original snowflake ID
|
||||
Action: action,
|
||||
Folder: parts[2], // May be empty string
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get retrieves the value for a key from the store
|
||||
func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) {
|
||||
if section == "" {
|
||||
return nil, fmt.Errorf("section is required")
|
||||
}
|
||||
if key == "" {
|
||||
return nil, fmt.Errorf("key is required")
|
||||
}
|
||||
|
||||
tableName, err := k.getTableName(section)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
valueIdent, err := k.ident("value")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
keyPathIdent, err := k.ident("key_path")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(
|
||||
"SELECT %s FROM %s WHERE %s = %s",
|
||||
valueIdent,
|
||||
tableName,
|
||||
keyPathIdent,
|
||||
k.dialect.ArgPlaceholder(1),
|
||||
)
|
||||
|
||||
// Execute the query
|
||||
var value []byte
|
||||
err = k.db.QueryRowContext(ctx, query, key).Scan(&value)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("query failed: %w", err)
|
||||
}
|
||||
|
||||
return io.NopCloser(bytes.NewReader(value)), nil
|
||||
}
|
||||
|
||||
// BatchGet retrieves multiple values for the given keys from the store
|
||||
// Uses a UNION ALL subquery with LEFT JOIN to preserve order and enable streaming
|
||||
func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] {
|
||||
if section == "" {
|
||||
return func(yield func(KeyValue, error) bool) {
|
||||
yield(KeyValue{}, fmt.Errorf("section is required"))
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return func(yield func(KeyValue, error) bool) {
|
||||
// Empty result set - nothing to yield
|
||||
}
|
||||
}
|
||||
|
||||
return func(yield func(KeyValue, error) bool) {
|
||||
tableName, err := k.getTableName(section)
|
||||
if err != nil {
|
||||
yield(KeyValue{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
keyPathIdent, err := k.ident("key_path")
|
||||
if err != nil {
|
||||
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
valueIdent, err := k.ident("value")
|
||||
if err != nil {
|
||||
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Build UNION ALL subquery to preserve key order
|
||||
// SELECT 0 AS idx, ? AS kp UNION ALL SELECT 1, ? UNION ALL ...
|
||||
var unionParts []string
|
||||
var args []interface{}
|
||||
argNum := 1
|
||||
|
||||
for i, key := range keys {
|
||||
if i == 0 {
|
||||
unionParts = append(unionParts, fmt.Sprintf(
|
||||
"SELECT %s AS idx, %s AS kp",
|
||||
k.dialect.ArgPlaceholder(argNum),
|
||||
k.dialect.ArgPlaceholder(argNum+1),
|
||||
))
|
||||
} else {
|
||||
unionParts = append(unionParts, fmt.Sprintf(
|
||||
"UNION ALL SELECT %s, %s",
|
||||
k.dialect.ArgPlaceholder(argNum),
|
||||
k.dialect.ArgPlaceholder(argNum+1),
|
||||
))
|
||||
}
|
||||
args = append(args, i, key)
|
||||
argNum += 2
|
||||
}
|
||||
|
||||
// Build the full query with LEFT JOIN to preserve order
|
||||
// This allows streaming results directly without buffering
|
||||
query := fmt.Sprintf(
|
||||
"SELECT v.idx, t.%s, t.%s FROM (%s) AS v LEFT JOIN %s t ON t.%s = v.kp ORDER BY v.idx",
|
||||
keyPathIdent,
|
||||
valueIdent,
|
||||
strings.Join(unionParts, " "),
|
||||
tableName,
|
||||
keyPathIdent,
|
||||
)
|
||||
|
||||
// Execute the query
|
||||
rows, err := k.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
yield(KeyValue{}, fmt.Errorf("query failed: %w", err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Stream results directly - no buffering needed!
|
||||
// Results come back in the order specified by idx
|
||||
for rows.Next() {
|
||||
var idx int
|
||||
var keyPath sql.NullString
|
||||
var value []byte
|
||||
|
||||
if err := rows.Scan(&idx, &keyPath, &value); err != nil {
|
||||
yield(KeyValue{}, fmt.Errorf("scan failed: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Skip keys that don't exist (LEFT JOIN returns NULL)
|
||||
if !keyPath.Valid {
|
||||
continue
|
||||
}
|
||||
|
||||
kv := KeyValue{
|
||||
Key: keyPath.String,
|
||||
Value: io.NopCloser(bytes.NewReader(value)),
|
||||
}
|
||||
|
||||
if !yield(kv, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
yield(KeyValue{}, fmt.Errorf("rows error: %w", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Keys returns all the keys in the store
|
||||
func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] {
|
||||
if section == "" {
|
||||
return func(yield func(string, error) bool) {
|
||||
yield("", fmt.Errorf("section is required"))
|
||||
}
|
||||
}
|
||||
|
||||
return func(yield func(string, error) bool) {
|
||||
tableName, err := k.getTableName(section)
|
||||
if err != nil {
|
||||
yield("", err)
|
||||
return
|
||||
}
|
||||
|
||||
keyPathIdent, err := k.ident("key_path")
|
||||
if err != nil {
|
||||
yield("", fmt.Errorf("invalid column identifier: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Build WHERE clauses
|
||||
var whereClauses []string
|
||||
var args []interface{}
|
||||
argNum := 1
|
||||
|
||||
// Start key (inclusive)
|
||||
if opt.StartKey != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("%s >= %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
|
||||
args = append(args, opt.StartKey)
|
||||
argNum++
|
||||
}
|
||||
|
||||
// End key (exclusive)
|
||||
if opt.EndKey != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("%s < %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
|
||||
args = append(args, opt.EndKey)
|
||||
argNum++
|
||||
}
|
||||
|
||||
// Build ORDER BY clause
|
||||
orderBy := "ASC"
|
||||
if opt.Sort == SortOrderDesc {
|
||||
orderBy = "DESC"
|
||||
}
|
||||
|
||||
// Build the query
|
||||
query := fmt.Sprintf(
|
||||
"SELECT %s FROM %s",
|
||||
keyPathIdent,
|
||||
tableName,
|
||||
)
|
||||
|
||||
if len(whereClauses) > 0 {
|
||||
query += " WHERE " + strings.Join(whereClauses, " AND ")
|
||||
}
|
||||
|
||||
query += fmt.Sprintf(" ORDER BY %s %s", keyPathIdent, orderBy)
|
||||
|
||||
if opt.Limit > 0 {
|
||||
query += fmt.Sprintf(" LIMIT %d", opt.Limit)
|
||||
}
|
||||
|
||||
// Execute the query
|
||||
rows, err := k.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
yield("", fmt.Errorf("query failed: %w", err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Yield each key
|
||||
for rows.Next() {
|
||||
var keyPath string
|
||||
if err := rows.Scan(&keyPath); err != nil {
|
||||
yield("", fmt.Errorf("scan failed: %w", err))
|
||||
return
|
||||
}
|
||||
if !yield(keyPath, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
yield("", fmt.Errorf("rows error: %w", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save a new value - returns a WriteCloser to write the value to
|
||||
func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) {
|
||||
if section == "" {
|
||||
return nil, fmt.Errorf("section is required")
|
||||
}
|
||||
if key == "" {
|
||||
return nil, fmt.Errorf("key is required")
|
||||
}
|
||||
|
||||
return &sqlWriteCloser{
|
||||
kv: k,
|
||||
ctx: ctx,
|
||||
section: section,
|
||||
key: key,
|
||||
buf: &bytes.Buffer{},
|
||||
closed: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// sqlWriteCloser implements io.WriteCloser for SQL KV Save operations
|
||||
type sqlWriteCloser struct {
|
||||
kv *sqlKV
|
||||
ctx context.Context
|
||||
section string
|
||||
key string
|
||||
buf *bytes.Buffer
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Write implements io.Writer
|
||||
func (w *sqlWriteCloser) Write(p []byte) (int, error) {
|
||||
if w.closed {
|
||||
return 0, fmt.Errorf("write to closed writer")
|
||||
}
|
||||
return w.buf.Write(p)
|
||||
}
|
||||
|
||||
// Close implements io.Closer - stores the buffered data in SQL
|
||||
func (w *sqlWriteCloser) Close() error {
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
w.closed = true
|
||||
|
||||
value := w.buf.Bytes()
|
||||
|
||||
switch w.section {
|
||||
case sectionEvents:
|
||||
// Simple upsert for events section
|
||||
return w.closeEvents(value)
|
||||
case sectionData:
|
||||
// Complex multi-table transaction for data section
|
||||
return w.closeData(value)
|
||||
default:
|
||||
return fmt.Errorf("unsupported section: %s", w.section)
|
||||
}
|
||||
}
|
||||
|
||||
// closeEvents handles the simple upsert for the events section
|
||||
func (w *sqlWriteCloser) closeEvents(value []byte) error {
|
||||
tableName, err := w.kv.getTableName(w.section)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyPathIdent, err := w.kv.ident("key_path")
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
|
||||
valueIdent, err := w.kv.ident("value")
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
|
||||
ph1 := w.kv.dialect.ArgPlaceholder(1)
|
||||
ph2 := w.kv.dialect.ArgPlaceholder(2)
|
||||
|
||||
var query string
|
||||
switch w.kv.dialect.DialectName() {
|
||||
case "postgres":
|
||||
query = fmt.Sprintf(
|
||||
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s",
|
||||
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
|
||||
)
|
||||
case "mysql":
|
||||
query = fmt.Sprintf(
|
||||
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON DUPLICATE KEY UPDATE %s = VALUES(%s)",
|
||||
tableName, keyPathIdent, valueIdent, ph1, ph2, valueIdent, valueIdent,
|
||||
)
|
||||
case "sqlite":
|
||||
query = fmt.Sprintf(
|
||||
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s",
|
||||
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||
}
|
||||
|
||||
_, err = w.kv.db.ExecContext(w.ctx, query, w.key, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert/update failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// closeData handles the complex multi-table transaction for the data section
|
||||
func (w *sqlWriteCloser) closeData(value []byte) error {
|
||||
// Parse the key to extract all fields
|
||||
parsed, err := parseDataKey(w.key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse key: %w", err)
|
||||
}
|
||||
|
||||
// Generate a GUID for this write
|
||||
guid := uuid.New().String()
|
||||
|
||||
// Execute all operations in a transaction
|
||||
return w.kv.db.WithTx(w.ctx, nil, func(ctx context.Context, tx db.Tx) error {
|
||||
// 1. Insert/update resource_history
|
||||
if err := w.upsertResourceHistory(ctx, tx, parsed, guid, value); err != nil {
|
||||
return fmt.Errorf("upsert resource_history: %w", err)
|
||||
}
|
||||
|
||||
// 2. Handle resource table based on action
|
||||
if parsed.Action == 3 { // deleted
|
||||
if err := w.deleteResource(ctx, tx, parsed); err != nil {
|
||||
return fmt.Errorf("delete resource: %w", err)
|
||||
}
|
||||
} else { // created or updated
|
||||
if err := w.upsertResource(ctx, tx, parsed, guid, value); err != nil {
|
||||
return fmt.Errorf("upsert resource: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Upsert resource_version table
|
||||
if err := w.upsertResourceVersion(ctx, tx, parsed); err != nil {
|
||||
return fmt.Errorf("upsert resource_version: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// upsertResourceHistory inserts/updates a row in the resource_history table
|
||||
func (w *sqlWriteCloser) upsertResourceHistory(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
|
||||
// Build identifiers
|
||||
tableIdent, _ := w.kv.ident("resource_history")
|
||||
guidIdent, _ := w.kv.ident("guid")
|
||||
groupIdent, _ := w.kv.ident("group")
|
||||
resourceIdent, _ := w.kv.ident("resource")
|
||||
namespaceIdent, _ := w.kv.ident("namespace")
|
||||
nameIdent, _ := w.kv.ident("name")
|
||||
rvIdent, _ := w.kv.ident("resource_version")
|
||||
prevRVIdent, _ := w.kv.ident("previous_resource_version")
|
||||
valueIdent, _ := w.kv.ident("value")
|
||||
actionIdent, _ := w.kv.ident("action")
|
||||
folderIdent, _ := w.kv.ident("folder")
|
||||
keyPathIdent, _ := w.kv.ident("key_path")
|
||||
|
||||
// Build placeholders
|
||||
var query string
|
||||
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||
|
||||
switch w.kv.dialect.DialectName() {
|
||||
case "postgres":
|
||||
query = fmt.Sprintf(`
|
||||
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)
|
||||
ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
|
||||
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
|
||||
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
|
||||
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
|
||||
guidIdent, valueIdent, valueIdent, keyPathIdent, keyPathIdent,
|
||||
)
|
||||
case "mysql", "sqlite":
|
||||
// For MySQL and SQLite, use INSERT OR REPLACE (requires all columns)
|
||||
query = fmt.Sprintf(`
|
||||
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)`,
|
||||
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
|
||||
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
|
||||
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||
}
|
||||
|
||||
_, err := tx.ExecContext(ctx, query,
|
||||
guid, w.key, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
|
||||
parsed.ResourceVersion, value, parsed.Action, parsed.Folder,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// upsertResource inserts/updates a row in the resource table
|
||||
func (w *sqlWriteCloser) upsertResource(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
|
||||
// Build identifiers
|
||||
tableIdent, _ := w.kv.ident("resource")
|
||||
guidIdent, _ := w.kv.ident("guid")
|
||||
groupIdent, _ := w.kv.ident("group")
|
||||
resourceIdent, _ := w.kv.ident("resource")
|
||||
namespaceIdent, _ := w.kv.ident("namespace")
|
||||
nameIdent, _ := w.kv.ident("name")
|
||||
rvIdent, _ := w.kv.ident("resource_version")
|
||||
valueIdent, _ := w.kv.ident("value")
|
||||
actionIdent, _ := w.kv.ident("action")
|
||||
|
||||
var query string
|
||||
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||
|
||||
switch w.kv.dialect.DialectName() {
|
||||
case "postgres":
|
||||
query = fmt.Sprintf(`
|
||||
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (%s, %s, %s, %s) DO UPDATE SET
|
||||
%s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
|
||||
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
|
||||
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
|
||||
namespaceIdent, groupIdent, resourceIdent, nameIdent,
|
||||
guidIdent, guidIdent, rvIdent, rvIdent, valueIdent, valueIdent, actionIdent, actionIdent,
|
||||
)
|
||||
case "mysql", "sqlite":
|
||||
query = fmt.Sprintf(`
|
||||
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)`,
|
||||
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
|
||||
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||
}
|
||||
|
||||
_, err := tx.ExecContext(ctx, query,
|
||||
guid, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
|
||||
parsed.ResourceVersion, value, parsed.Action,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteResource deletes a row from the resource table
|
||||
func (w *sqlWriteCloser) deleteResource(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
|
||||
tableIdent, _ := w.kv.ident("resource")
|
||||
groupIdent, _ := w.kv.ident("group")
|
||||
resourceIdent, _ := w.kv.ident("resource")
|
||||
namespaceIdent, _ := w.kv.ident("namespace")
|
||||
nameIdent, _ := w.kv.ident("name")
|
||||
|
||||
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
DELETE FROM %s WHERE %s = %s AND %s = %s AND %s = %s AND %s = %s`,
|
||||
tableIdent, groupIdent, ph(1), resourceIdent, ph(2), namespaceIdent, ph(3), nameIdent, ph(4),
|
||||
)
|
||||
|
||||
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
// upsertResourceVersion inserts/updates the resource_version table
|
||||
func (w *sqlWriteCloser) upsertResourceVersion(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
|
||||
tableIdent, _ := w.kv.ident("resource_version")
|
||||
groupIdent, _ := w.kv.ident("group")
|
||||
resourceIdent, _ := w.kv.ident("resource")
|
||||
rvIdent, _ := w.kv.ident("resource_version")
|
||||
|
||||
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||
|
||||
var query string
|
||||
switch w.kv.dialect.DialectName() {
|
||||
case "postgres":
|
||||
// Only update if new resource_version is greater than existing
|
||||
query = fmt.Sprintf(`
|
||||
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
|
||||
WHERE EXCLUDED.%s > %s.%s`,
|
||||
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||
groupIdent, resourceIdent, rvIdent, rvIdent,
|
||||
rvIdent, tableIdent, rvIdent,
|
||||
)
|
||||
case "sqlite":
|
||||
// SQLite supports WHERE clause in ON CONFLICT DO UPDATE
|
||||
query = fmt.Sprintf(`
|
||||
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
|
||||
WHERE EXCLUDED.%s > %s.%s`,
|
||||
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||
groupIdent, resourceIdent, rvIdent, rvIdent,
|
||||
rvIdent, tableIdent, rvIdent,
|
||||
)
|
||||
case "mysql":
|
||||
// MySQL uses ON DUPLICATE KEY UPDATE with conditional
|
||||
query = fmt.Sprintf(`
|
||||
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||
ON DUPLICATE KEY UPDATE %s = IF(VALUES(%s) > %s, VALUES(%s), %s)`,
|
||||
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||
rvIdent, rvIdent, rvIdent, rvIdent, rvIdent,
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||
}
|
||||
|
||||
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.ResourceVersion)
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete a value
|
||||
func (k *sqlKV) Delete(ctx context.Context, section string, key string) error {
|
||||
if section == "" {
|
||||
return fmt.Errorf("section is required")
|
||||
}
|
||||
if key == "" {
|
||||
return fmt.Errorf("key is required")
|
||||
}
|
||||
|
||||
tableName, err := k.getTableName(section)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyPathIdent, err := k.ident("key_path")
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
|
||||
// First check if key exists (to return ErrNotFound if missing)
|
||||
checkQuery := fmt.Sprintf(
|
||||
"SELECT 1 FROM %s WHERE %s = %s",
|
||||
tableName,
|
||||
keyPathIdent,
|
||||
k.dialect.ArgPlaceholder(1),
|
||||
)
|
||||
|
||||
var exists int
|
||||
err = k.db.QueryRowContext(ctx, checkQuery, key).Scan(&exists)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("check existence failed: %w", err)
|
||||
}
|
||||
|
||||
// Delete the key
|
||||
deleteQuery := fmt.Sprintf(
|
||||
"DELETE FROM %s WHERE %s = %s",
|
||||
tableName,
|
||||
keyPathIdent,
|
||||
k.dialect.ArgPlaceholder(1),
|
||||
)
|
||||
|
||||
_, err = k.db.ExecContext(ctx, deleteQuery, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchDelete removes multiple keys from the store
|
||||
func (k *sqlKV) BatchDelete(ctx context.Context, section string, keys []string) error {
|
||||
if section == "" {
|
||||
return fmt.Errorf("section is required")
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return nil // Nothing to delete
|
||||
}
|
||||
|
||||
tableName, err := k.getTableName(section)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyPathIdent, err := k.ident("key_path")
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid column identifier: %w", err)
|
||||
}
|
||||
|
||||
// Build IN clause placeholders
|
||||
placeholders := make([]string, len(keys))
|
||||
args := make([]interface{}, len(keys))
|
||||
for i, key := range keys {
|
||||
placeholders[i] = k.dialect.ArgPlaceholder(i + 1)
|
||||
args[i] = key
|
||||
}
|
||||
|
||||
// Build the query
|
||||
query := fmt.Sprintf(
|
||||
"DELETE FROM %s WHERE %s IN (%s)",
|
||||
tableName,
|
||||
keyPathIdent,
|
||||
strings.Join(placeholders, ", "),
|
||||
)
|
||||
|
||||
// Execute the query (idempotent - non-existent keys are silently ignored)
|
||||
_, err = k.db.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("batch delete failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnixTimestamp returns the current time in seconds since Epoch
|
||||
func (k *sqlKV) UnixTimestamp(ctx context.Context) (int64, error) {
|
||||
return time.Now().Unix(), nil
|
||||
}
|
||||
|
||||
// Ping checks if the database connection is alive
|
||||
func (k *sqlKV) Ping(ctx context.Context) error {
|
||||
if k.db == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
return k.db.PingContext(ctx)
|
||||
}
|
||||
|
||||
// checkDB verifies the database connection is still valid before operations
|
||||
func (k *sqlKV) checkDB() error {
|
||||
if k.db == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
// Quick ping to verify connection is alive
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if err := k.db.PingContext(ctx); err != nil {
|
||||
return fmt.Errorf("database connection is not healthy: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -66,9 +66,15 @@ type kvStorageBackend struct {
|
||||
withExperimentalClusterScope bool
|
||||
//tracer trace.Tracer
|
||||
//reg prometheus.Registerer
|
||||
|
||||
// lifecycle management
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
var _ StorageBackend = &kvStorageBackend{}
|
||||
var _ LifecycleHooks = &kvStorageBackend{}
|
||||
var _ resourcepb.DiagnosticsServer = &kvStorageBackend{}
|
||||
|
||||
type KVBackendOptions struct {
|
||||
KvStore KV
|
||||
@@ -81,7 +87,6 @@ type KVBackendOptions struct {
|
||||
}
|
||||
|
||||
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||
ctx := context.Background()
|
||||
kv := opts.KvStore
|
||||
|
||||
s, err := snowflake.NewNode(rand.Int64N(1024))
|
||||
@@ -100,6 +105,9 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||
eventPruningInterval = defaultEventPruningInterval
|
||||
}
|
||||
|
||||
// Create a cancellable context for lifecycle management
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
backend := &kvStorageBackend{
|
||||
kv: kv,
|
||||
dataStore: newDataStore(kv),
|
||||
@@ -111,18 +119,58 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||
eventRetentionPeriod: eventRetentionPeriod,
|
||||
eventPruningInterval: eventPruningInterval,
|
||||
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
err = backend.initPruner(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize pruner: %w", err)
|
||||
}
|
||||
|
||||
// Start the event cleanup background job
|
||||
go backend.runCleanupOldEvents(ctx)
|
||||
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
// Init implements LifecycleHooks
|
||||
func (k *kvStorageBackend) Init(ctx context.Context) error {
|
||||
// Initialize the pruner
|
||||
if err := k.initPruner(ctx); err != nil {
|
||||
return fmt.Errorf("failed to initialize pruner: %w", err)
|
||||
}
|
||||
|
||||
// Start the event cleanup background job using the backend's lifecycle context
|
||||
go k.runCleanupOldEvents(k.ctx)
|
||||
|
||||
// Start the pruner if it was configured
|
||||
if k.historyPruner != nil {
|
||||
k.historyPruner.Start(k.ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements LifecycleHooks
|
||||
func (k *kvStorageBackend) Stop(ctx context.Context) error {
|
||||
// Cancel the context to stop background goroutines
|
||||
// This will stop both runCleanupOldEvents and the pruner (via debouncer)
|
||||
k.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsHealthy implements DiagnosticsServer
|
||||
func (k *kvStorageBackend) IsHealthy(ctx context.Context, _ *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
// Check if the underlying KV store supports Ping (e.g., sqlKV)
|
||||
type pinger interface {
|
||||
Ping(context.Context) error
|
||||
}
|
||||
if p, ok := k.kv.(pinger); ok {
|
||||
if err := p.Ping(ctx); err != nil {
|
||||
return nil, fmt.Errorf("KV store health check failed: %w", err)
|
||||
}
|
||||
}
|
||||
return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_SERVING}, nil
|
||||
}
|
||||
|
||||
// Read implements DiagnosticsServer
|
||||
func (k *kvStorageBackend) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||||
return nil, ErrNotImplementedYet
|
||||
}
|
||||
|
||||
// runCleanupOldEvents starts a background goroutine that periodically cleans up old events
|
||||
func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) {
|
||||
// Run cleanup every hour
|
||||
@@ -217,7 +265,6 @@ func (k *kvStorageBackend) initPruner(ctx context.Context) error {
|
||||
}
|
||||
|
||||
k.historyPruner = pruner
|
||||
k.historyPruner.Start(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,45 @@
|
||||
UPDATE {{ .Ident "resource_history" }}
|
||||
SET {{ .Ident "resource_version" }} = (
|
||||
SET
|
||||
{{ .Ident "resource_version" }} = (
|
||||
CASE
|
||||
{{ range $guid, $rv := .GUIDToRV }}
|
||||
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
|
||||
{{ end }}
|
||||
END
|
||||
),
|
||||
{{ .Ident "key_path" }} = {{ if eq .DialectName "sqlite" -}}
|
||||
{{ .Ident "group" }} || CHAR(47) || {{ .Ident "resource" }} || CHAR(47) || {{ .Ident "namespace" }} || CHAR(47) || {{ .Ident "name" }} || CHAR(47) ||
|
||||
CAST((CASE
|
||||
{{- range $guid, $rv := .GUIDToRV }}
|
||||
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} / 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} % 1000)
|
||||
{{- end }}
|
||||
END) AS TEXT) || CHAR(126) ||
|
||||
CASE {{ .Ident "action" }}
|
||||
WHEN 1 THEN 'created'
|
||||
WHEN 2 THEN 'updated'
|
||||
WHEN 3 THEN 'deleted'
|
||||
ELSE 'unknown'
|
||||
END || CHAR(126) || COALESCE({{ .Ident "folder" }}, '')
|
||||
{{- else -}}
|
||||
CONCAT(
|
||||
{{ .Ident "group" }}, CHAR(47),
|
||||
{{ .Ident "resource" }}, CHAR(47),
|
||||
{{ .Ident "namespace" }}, CHAR(47),
|
||||
{{ .Ident "name" }}, CHAR(47),
|
||||
CAST((CASE
|
||||
{{- range $guid, $rv := .GUIDToRV }}
|
||||
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} DIV 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} MOD 1000)
|
||||
{{- end }}
|
||||
END) AS {{ if eq .DialectName "postgres" }}TEXT{{ else }}CHAR{{ end }}), CHAR(126),
|
||||
CASE {{ .Ident "action" }}
|
||||
WHEN 1 THEN 'created'
|
||||
WHEN 2 THEN 'updated'
|
||||
WHEN 3 THEN 'deleted'
|
||||
ELSE 'unknown'
|
||||
END, CHAR(126),
|
||||
COALESCE({{ .Ident "folder" }}, '')
|
||||
)
|
||||
{{- end }}
|
||||
WHERE {{ .Ident "guid" }} IN (
|
||||
{{$first := true}}
|
||||
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}
|
||||
|
||||
@@ -3,7 +3,9 @@ package migrations
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
)
|
||||
|
||||
func initResourceTables(mg *migrator.Migrator) string {
|
||||
@@ -185,5 +187,170 @@ func initResourceTables(mg *migrator.Migrator) string {
|
||||
Name: "UQE_resource_last_import_time_last_import_time",
|
||||
}))
|
||||
|
||||
// TODO: Do we want the value to be MEDIUMTEXT ?
|
||||
// TODO: What's the best name for the key_path column?
|
||||
|
||||
// Add key_path column to resource_history for KV interface
|
||||
mg.AddMigration("Add key_path column to resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{
|
||||
Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: true,
|
||||
}))
|
||||
|
||||
// Backfill key_path column in resource_history
|
||||
mg.AddMigration("Backfill key_path column in resource_history", &resourceHistoryKeyBackfillMigrator{})
|
||||
|
||||
// Note: key_path remains nullable because the write pattern is:
|
||||
// 1. INSERT (key_path = NULL)
|
||||
// 2. UPDATE (key_path = actual value after RV allocation)
|
||||
|
||||
// Add index on key_path column
|
||||
mg.AddMigration("Add index on key_path column in resource_history", migrator.NewAddIndexMigration(resource_history_table, &migrator.Index{
|
||||
Name: "IDX_resource_history_key_path",
|
||||
Cols: []string{"key_path"},
|
||||
Type: migrator.IndexType,
|
||||
}))
|
||||
|
||||
// Create resource_events table for KV interface
|
||||
resource_events_table := migrator.Table{
|
||||
Name: "resource_events",
|
||||
Columns: []*migrator.Column{
|
||||
{Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: false, IsPrimaryKey: true},
|
||||
{Name: "value", Type: migrator.DB_MediumText, Nullable: false},
|
||||
},
|
||||
}
|
||||
mg.AddMigration("create table "+resource_events_table.Name, migrator.NewAddTableMigration(resource_events_table))
|
||||
|
||||
return marker
|
||||
}
|
||||
|
||||
// resourceHistoryKeyBackfillMigrator backfills the key_path column in resource_history table
|
||||
// It processes rows in batches to reduce lock duration and avoid timeouts on large tables
|
||||
type resourceHistoryKeyBackfillMigrator struct {
|
||||
migrator.MigrationBase
|
||||
}
|
||||
|
||||
func (m *resourceHistoryKeyBackfillMigrator) SQL(dialect migrator.Dialect) string {
|
||||
return "Backfill key_path column in resource_history using pattern: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}"
|
||||
}
|
||||
|
||||
func (m *resourceHistoryKeyBackfillMigrator) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||
dialect := mg.Dialect.DriverName()
|
||||
logger := log.New("resource-history-key-backfill")
|
||||
|
||||
// TODO: Verify the RV to Snowflake ID conversion is correct.
|
||||
|
||||
// Snowflake ID epoch in milliseconds (2010-11-04T01:42:54.657Z)
|
||||
const epochMs = 1288834974657
|
||||
const batchSize = 1000 // Process 1000 rows at a time
|
||||
|
||||
// Count total rows to backfill
|
||||
totalCount, err := sess.Table("resource_history").Where("key_path IS NULL").Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count rows: %w", err)
|
||||
}
|
||||
|
||||
if totalCount == 0 {
|
||||
logger.Info("No rows to backfill")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Starting key_path backfill", "total_rows", totalCount)
|
||||
|
||||
// Build the SQL query based on the database dialect
|
||||
var updateSQL string
|
||||
|
||||
switch dialect {
|
||||
case "mysql":
|
||||
updateSQL = `
|
||||
UPDATE resource_history
|
||||
SET key_path = CONCAT(
|
||||
` + "`group`" + `, '/',
|
||||
` + "`resource`" + `, '/',
|
||||
` + "`namespace`" + `, '/',
|
||||
` + "`name`" + `, '/',
|
||||
CAST((((` + "`resource_version`" + ` DIV 1000) - ?) * 4194304) + (` + "`resource_version`" + ` MOD 1000) AS CHAR), '~',
|
||||
CASE ` + "`action`" + `
|
||||
WHEN 1 THEN 'created'
|
||||
WHEN 2 THEN 'updated'
|
||||
WHEN 3 THEN 'deleted'
|
||||
ELSE 'unknown'
|
||||
END, '~',
|
||||
COALESCE(` + "`folder`" + `, '')
|
||||
)
|
||||
WHERE key_path IS NULL
|
||||
LIMIT ?
|
||||
`
|
||||
case "postgres":
|
||||
updateSQL = `
|
||||
UPDATE resource_history
|
||||
SET key_path = CONCAT(
|
||||
"group", '/',
|
||||
"resource", '/',
|
||||
"namespace", '/',
|
||||
"name", '/',
|
||||
CAST((((resource_version / 1000) - $1) * 4194304) + (resource_version % 1000) AS BIGINT), '~',
|
||||
CASE "action"
|
||||
WHEN 1 THEN 'created'
|
||||
WHEN 2 THEN 'updated'
|
||||
WHEN 3 THEN 'deleted'
|
||||
ELSE 'unknown'
|
||||
END, '~',
|
||||
COALESCE("folder", '')
|
||||
)
|
||||
WHERE guid IN (
|
||||
SELECT guid FROM resource_history
|
||||
WHERE key_path IS NULL
|
||||
LIMIT $2
|
||||
)
|
||||
`
|
||||
case "sqlite3":
|
||||
updateSQL = `
|
||||
UPDATE resource_history
|
||||
SET key_path =
|
||||
"group" || '/' ||
|
||||
resource || '/' ||
|
||||
namespace || '/' ||
|
||||
name || '/' ||
|
||||
CAST((((resource_version / 1000) - ?) * 4194304) + (resource_version % 1000) AS TEXT) || '~' ||
|
||||
CASE action
|
||||
WHEN 1 THEN 'created'
|
||||
WHEN 2 THEN 'updated'
|
||||
WHEN 3 THEN 'deleted'
|
||||
ELSE 'unknown'
|
||||
END || '~' ||
|
||||
COALESCE(folder, '')
|
||||
WHERE guid IN (
|
||||
SELECT guid FROM resource_history
|
||||
WHERE key_path IS NULL
|
||||
LIMIT ?
|
||||
)
|
||||
`
|
||||
default:
|
||||
return fmt.Errorf("unsupported database dialect: %s", dialect)
|
||||
}
|
||||
|
||||
// Process in batches
|
||||
processed := int64(0)
|
||||
for {
|
||||
result, err := sess.Exec(updateSQL, epochMs, batchSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update batch: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
}
|
||||
|
||||
processed += rowsAffected
|
||||
logger.Info("Backfill progress", "processed", processed, "total", totalCount,
|
||||
"percent", fmt.Sprintf("%.1f%%", float64(processed)/float64(totalCount)*100))
|
||||
|
||||
// If we updated fewer rows than batch size, we're done
|
||||
if rowsAffected < int64(batchSize) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Backfill completed", "total_processed", processed)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
)
|
||||
|
||||
@@ -101,6 +102,30 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
|
||||
|
||||
// Check if KV backend is enabled via feature flag
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
if opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageKVBackend) {
|
||||
// Create SQL KV instance
|
||||
sqlKV, err := resource.NewSQLKV(eDB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create SQL KV: %w", err)
|
||||
}
|
||||
|
||||
// Use existing KV storage backend (already implements StorageBackend interface)
|
||||
kvBackend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
|
||||
KvStore: sqlKV,
|
||||
WithPruner: withPruner,
|
||||
Tracer: opts.Tracer,
|
||||
Reg: opts.Reg,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create KV backend: %w", err)
|
||||
}
|
||||
serverOptions.Backend = kvBackend
|
||||
serverOptions.Lifecycle = kvBackend.(resource.LifecycleHooks)
|
||||
serverOptions.Diagnostics = kvBackend.(resourcepb.DiagnosticsServer)
|
||||
} else {
|
||||
// Use existing SQL backend
|
||||
backend, err := NewBackend(BackendOptions{
|
||||
DBProvider: eDB,
|
||||
Tracer: opts.Tracer,
|
||||
@@ -117,6 +142,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
serverOptions.Diagnostics = backend
|
||||
serverOptions.Lifecycle = backend
|
||||
}
|
||||
}
|
||||
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
|
||||
Reference in New Issue
Block a user