mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 03:54:29 +08:00
Compare commits
7 Commits
zoltan/pos
...
sql-kvstor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f634f37b5f | ||
|
|
f5ef25233d | ||
|
|
e156dfd92a | ||
|
|
fa3d906d41 | ||
|
|
a8c86de2d6 | ||
|
|
761a8f6c31 | ||
|
|
3c0fc606ae |
@@ -912,6 +912,10 @@ export interface FeatureToggles {
|
|||||||
*/
|
*/
|
||||||
unifiedStorageGrpcConnectionPool?: boolean;
|
unifiedStorageGrpcConnectionPool?: boolean;
|
||||||
/**
|
/**
|
||||||
|
* Use KV-backed SQL storage backend instead of direct SQL queries
|
||||||
|
*/
|
||||||
|
unifiedStorageKVBackend?: boolean;
|
||||||
|
/**
|
||||||
* Enables UI functionality to permanently delete alert rules
|
* Enables UI functionality to permanently delete alert rules
|
||||||
* @default true
|
* @default true
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1580,6 +1580,14 @@ var (
|
|||||||
HideFromAdminPage: true,
|
HideFromAdminPage: true,
|
||||||
HideFromDocs: true,
|
HideFromDocs: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "unifiedStorageKVBackend",
|
||||||
|
Description: "Use KV-backed SQL storage backend instead of direct SQL queries",
|
||||||
|
Stage: FeatureStageExperimental,
|
||||||
|
Owner: grafanaSearchAndStorageSquad,
|
||||||
|
HideFromAdminPage: true,
|
||||||
|
HideFromDocs: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "alertingRulePermanentlyDelete",
|
Name: "alertingRulePermanentlyDelete",
|
||||||
Description: "Enables UI functionality to permanently delete alert rules",
|
Description: "Enables UI functionality to permanently delete alert rules",
|
||||||
|
|||||||
1
pkg/services/featuremgmt/toggles_gen.csv
generated
1
pkg/services/featuremgmt/toggles_gen.csv
generated
@@ -205,6 +205,7 @@ unifiedStorageHistoryPruner,GA,@grafana/search-and-storage,false,false,false
|
|||||||
azureMonitorLogsBuilderEditor,preview,@grafana/partner-datasources,false,false,false
|
azureMonitorLogsBuilderEditor,preview,@grafana/partner-datasources,false,false,false
|
||||||
localeFormatPreference,preview,@grafana/grafana-frontend-platform,false,false,false
|
localeFormatPreference,preview,@grafana/grafana-frontend-platform,false,false,false
|
||||||
unifiedStorageGrpcConnectionPool,experimental,@grafana/search-and-storage,false,false,false
|
unifiedStorageGrpcConnectionPool,experimental,@grafana/search-and-storage,false,false,false
|
||||||
|
unifiedStorageKVBackend,experimental,@grafana/search-and-storage,false,false,false
|
||||||
alertingRulePermanentlyDelete,GA,@grafana/alerting-squad,false,false,true
|
alertingRulePermanentlyDelete,GA,@grafana/alerting-squad,false,false,true
|
||||||
alertingRuleRecoverDeleted,GA,@grafana/alerting-squad,false,false,true
|
alertingRuleRecoverDeleted,GA,@grafana/alerting-squad,false,false,true
|
||||||
multiTenantTempCredentials,experimental,@grafana/aws-datasources,false,false,false
|
multiTenantTempCredentials,experimental,@grafana/aws-datasources,false,false,false
|
||||||
|
|||||||
|
4
pkg/services/featuremgmt/toggles_gen.go
generated
4
pkg/services/featuremgmt/toggles_gen.go
generated
@@ -830,6 +830,10 @@ const (
|
|||||||
// Enables the unified storage grpc connection pool
|
// Enables the unified storage grpc connection pool
|
||||||
FlagUnifiedStorageGrpcConnectionPool = "unifiedStorageGrpcConnectionPool"
|
FlagUnifiedStorageGrpcConnectionPool = "unifiedStorageGrpcConnectionPool"
|
||||||
|
|
||||||
|
// FlagUnifiedStorageKVBackend
|
||||||
|
// Use KV-backed SQL storage backend instead of direct SQL queries
|
||||||
|
FlagUnifiedStorageKVBackend = "unifiedStorageKVBackend"
|
||||||
|
|
||||||
// FlagAlertingRulePermanentlyDelete
|
// FlagAlertingRulePermanentlyDelete
|
||||||
// Enables UI functionality to permanently delete alert rules
|
// Enables UI functionality to permanently delete alert rules
|
||||||
FlagAlertingRulePermanentlyDelete = "alertingRulePermanentlyDelete"
|
FlagAlertingRulePermanentlyDelete = "alertingRulePermanentlyDelete"
|
||||||
|
|||||||
14
pkg/services/featuremgmt/toggles_gen.json
generated
14
pkg/services/featuremgmt/toggles_gen.json
generated
@@ -4208,6 +4208,20 @@
|
|||||||
"expression": "true"
|
"expression": "true"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"metadata": {
|
||||||
|
"name": "unifiedStorageKVBackend",
|
||||||
|
"resourceVersion": "1763461706359",
|
||||||
|
"creationTimestamp": "2025-11-18T10:28:26Z"
|
||||||
|
},
|
||||||
|
"spec": {
|
||||||
|
"description": "Use KV-backed SQL storage backend instead of direct SQL queries",
|
||||||
|
"stage": "experimental",
|
||||||
|
"codeowner": "@grafana/search-and-storage",
|
||||||
|
"hideFromAdminPage": true,
|
||||||
|
"hideFromDocs": true
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"name": "unifiedStorageSearch",
|
"name": "unifiedStorageSearch",
|
||||||
|
|||||||
835
pkg/storage/unified/resource/sqlkv.go
Normal file
835
pkg/storage/unified/resource/sqlkv.go
Normal file
@@ -0,0 +1,835 @@
|
|||||||
|
package resource
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"iter"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bwmarrin/snowflake"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
sectionData = "unified/data"
|
||||||
|
sectionEvents = "unified/events"
|
||||||
|
)
|
||||||
|
|
||||||
|
// sqlKV implements the KV interface using SQL storage
|
||||||
|
type sqlKV struct {
|
||||||
|
dbProvider db.DBProvider // Keep reference to prevent GC
|
||||||
|
db db.DB
|
||||||
|
dialect sqltemplate.Dialect
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSQLKV creates a new SQL-based KV store
|
||||||
|
func NewSQLKV(dbProvider db.DBProvider) (KV, error) {
|
||||||
|
if dbProvider == nil {
|
||||||
|
return nil, errors.New("dbProvider is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the database connection
|
||||||
|
ctx := context.Background()
|
||||||
|
dbConn, err := dbProvider.Init(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("initialize DB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the SQL dialect
|
||||||
|
var dialect sqltemplate.Dialect
|
||||||
|
switch dbConn.DriverName() {
|
||||||
|
case "mysql":
|
||||||
|
dialect = sqltemplate.MySQL
|
||||||
|
case "postgres":
|
||||||
|
dialect = sqltemplate.PostgreSQL
|
||||||
|
case "sqlite3", "sqlite":
|
||||||
|
dialect = sqltemplate.SQLite
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
|
||||||
|
}
|
||||||
|
|
||||||
|
return &sqlKV{
|
||||||
|
dbProvider: dbProvider, // Keep reference to prevent GC from closing the database
|
||||||
|
db: dbConn,
|
||||||
|
dialect: dialect,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that sqlKV implements KV interface
|
||||||
|
var _ KV = &sqlKV{}
|
||||||
|
|
||||||
|
// Helper function to build identifiers safely
|
||||||
|
func (k *sqlKV) ident(name string) (string, error) {
|
||||||
|
return k.dialect.Ident(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to get table name for a section
|
||||||
|
func (k *sqlKV) getTableName(section string) (string, error) {
|
||||||
|
switch section {
|
||||||
|
case sectionData:
|
||||||
|
return k.ident("resource_history")
|
||||||
|
case sectionEvents:
|
||||||
|
return k.ident("resource_events")
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unsupported section: %s", section)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parsedKey represents the components of a key_path for the data section
|
||||||
|
// Format: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}
|
||||||
|
type parsedKey struct {
|
||||||
|
Group string
|
||||||
|
Resource string
|
||||||
|
Namespace string
|
||||||
|
Name string
|
||||||
|
ResourceVersion int64 // Microsecond timestamp
|
||||||
|
ResourceVersionSnowflake int64 // Original snowflake ID from key
|
||||||
|
Action int // 1: create, 2: update, 3: delete
|
||||||
|
Folder string
|
||||||
|
}
|
||||||
|
|
||||||
|
// snowflakeToMicroseconds converts a snowflake ID to a unix microsecond timestamp
|
||||||
|
// Uses the snowflake library's Time() method which handles the Grafana epoch internally
|
||||||
|
func snowflakeToMicroseconds(snowflakeID int64) int64 {
|
||||||
|
// Extract unix milliseconds from snowflake (handles Grafana epoch internally)
|
||||||
|
unixMilliseconds := snowflake.ID(snowflakeID).Time()
|
||||||
|
|
||||||
|
// Extract sequence number (low 12 bits) for sub-millisecond precision
|
||||||
|
sequence := snowflakeID & 0xFFF // 0xFFF = 4095 = 2^12 - 1
|
||||||
|
|
||||||
|
// Convert to unix microseconds: (unix_ms * 1000) + sequence
|
||||||
|
return (unixMilliseconds * 1000) + sequence
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseDataKey parses a data section key_path
|
||||||
|
func parseDataKey(keyPath string) (*parsedKey, error) {
|
||||||
|
// Split by ~ to separate main key from action and folder
|
||||||
|
parts := strings.Split(keyPath, "~")
|
||||||
|
if len(parts) != 3 {
|
||||||
|
return nil, fmt.Errorf("invalid key format: expected 3 parts separated by '~', got %d", len(parts))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Split main key by /
|
||||||
|
mainParts := strings.Split(parts[0], "/")
|
||||||
|
if len(mainParts) != 5 {
|
||||||
|
return nil, fmt.Errorf("invalid key format: expected 5 parts separated by '/', got %d", len(mainParts))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse resource version (stored as snowflake ID in key)
|
||||||
|
snowflakeID, err := strconv.ParseInt(mainParts[4], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid resource_version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert snowflake ID to microsecond timestamp for database storage
|
||||||
|
microseconds := snowflakeToMicroseconds(snowflakeID)
|
||||||
|
|
||||||
|
// Convert action string to int
|
||||||
|
var action int
|
||||||
|
switch parts[1] {
|
||||||
|
case "created":
|
||||||
|
action = 1
|
||||||
|
case "updated":
|
||||||
|
action = 2
|
||||||
|
case "deleted":
|
||||||
|
action = 3
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid action: %s", parts[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
return &parsedKey{
|
||||||
|
Group: mainParts[0],
|
||||||
|
Resource: mainParts[1],
|
||||||
|
Namespace: mainParts[2],
|
||||||
|
Name: mainParts[3],
|
||||||
|
ResourceVersion: microseconds, // Microsecond timestamp for DB
|
||||||
|
ResourceVersionSnowflake: snowflakeID, // Original snowflake ID
|
||||||
|
Action: action,
|
||||||
|
Folder: parts[2], // May be empty string
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves the value for a key from the store
|
||||||
|
func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) {
|
||||||
|
if section == "" {
|
||||||
|
return nil, fmt.Errorf("section is required")
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
|
return nil, fmt.Errorf("key is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
tableName, err := k.getTableName(section)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
valueIdent, err := k.ident("value")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
keyPathIdent, err := k.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT %s FROM %s WHERE %s = %s",
|
||||||
|
valueIdent,
|
||||||
|
tableName,
|
||||||
|
keyPathIdent,
|
||||||
|
k.dialect.ArgPlaceholder(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Execute the query
|
||||||
|
var value []byte
|
||||||
|
err = k.db.QueryRowContext(ctx, query, key).Scan(&value)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("query failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(bytes.NewReader(value)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchGet retrieves multiple values for the given keys from the store
|
||||||
|
// Uses a UNION ALL subquery with LEFT JOIN to preserve order and enable streaming
|
||||||
|
func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] {
|
||||||
|
if section == "" {
|
||||||
|
return func(yield func(KeyValue, error) bool) {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("section is required"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return func(yield func(KeyValue, error) bool) {
|
||||||
|
// Empty result set - nothing to yield
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(yield func(KeyValue, error) bool) {
|
||||||
|
tableName, err := k.getTableName(section)
|
||||||
|
if err != nil {
|
||||||
|
yield(KeyValue{}, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPathIdent, err := k.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
valueIdent, err := k.ident("value")
|
||||||
|
if err != nil {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("invalid column identifier: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build UNION ALL subquery to preserve key order
|
||||||
|
// SELECT 0 AS idx, ? AS kp UNION ALL SELECT 1, ? UNION ALL ...
|
||||||
|
var unionParts []string
|
||||||
|
var args []interface{}
|
||||||
|
argNum := 1
|
||||||
|
|
||||||
|
for i, key := range keys {
|
||||||
|
if i == 0 {
|
||||||
|
unionParts = append(unionParts, fmt.Sprintf(
|
||||||
|
"SELECT %s AS idx, %s AS kp",
|
||||||
|
k.dialect.ArgPlaceholder(argNum),
|
||||||
|
k.dialect.ArgPlaceholder(argNum+1),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
unionParts = append(unionParts, fmt.Sprintf(
|
||||||
|
"UNION ALL SELECT %s, %s",
|
||||||
|
k.dialect.ArgPlaceholder(argNum),
|
||||||
|
k.dialect.ArgPlaceholder(argNum+1),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
args = append(args, i, key)
|
||||||
|
argNum += 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the full query with LEFT JOIN to preserve order
|
||||||
|
// This allows streaming results directly without buffering
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT v.idx, t.%s, t.%s FROM (%s) AS v LEFT JOIN %s t ON t.%s = v.kp ORDER BY v.idx",
|
||||||
|
keyPathIdent,
|
||||||
|
valueIdent,
|
||||||
|
strings.Join(unionParts, " "),
|
||||||
|
tableName,
|
||||||
|
keyPathIdent,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Execute the query
|
||||||
|
rows, err := k.db.QueryContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("query failed: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// Stream results directly - no buffering needed!
|
||||||
|
// Results come back in the order specified by idx
|
||||||
|
for rows.Next() {
|
||||||
|
var idx int
|
||||||
|
var keyPath sql.NullString
|
||||||
|
var value []byte
|
||||||
|
|
||||||
|
if err := rows.Scan(&idx, &keyPath, &value); err != nil {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("scan failed: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip keys that don't exist (LEFT JOIN returns NULL)
|
||||||
|
if !keyPath.Valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
kv := KeyValue{
|
||||||
|
Key: keyPath.String,
|
||||||
|
Value: io.NopCloser(bytes.NewReader(value)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if !yield(kv, nil) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
yield(KeyValue{}, fmt.Errorf("rows error: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keys returns all the keys in the store
|
||||||
|
func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] {
|
||||||
|
if section == "" {
|
||||||
|
return func(yield func(string, error) bool) {
|
||||||
|
yield("", fmt.Errorf("section is required"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(yield func(string, error) bool) {
|
||||||
|
tableName, err := k.getTableName(section)
|
||||||
|
if err != nil {
|
||||||
|
yield("", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPathIdent, err := k.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
yield("", fmt.Errorf("invalid column identifier: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build WHERE clauses
|
||||||
|
var whereClauses []string
|
||||||
|
var args []interface{}
|
||||||
|
argNum := 1
|
||||||
|
|
||||||
|
// Start key (inclusive)
|
||||||
|
if opt.StartKey != "" {
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s >= %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
|
||||||
|
args = append(args, opt.StartKey)
|
||||||
|
argNum++
|
||||||
|
}
|
||||||
|
|
||||||
|
// End key (exclusive)
|
||||||
|
if opt.EndKey != "" {
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s < %s", keyPathIdent, k.dialect.ArgPlaceholder(argNum)))
|
||||||
|
args = append(args, opt.EndKey)
|
||||||
|
argNum++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build ORDER BY clause
|
||||||
|
orderBy := "ASC"
|
||||||
|
if opt.Sort == SortOrderDesc {
|
||||||
|
orderBy = "DESC"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the query
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT %s FROM %s",
|
||||||
|
keyPathIdent,
|
||||||
|
tableName,
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(whereClauses) > 0 {
|
||||||
|
query += " WHERE " + strings.Join(whereClauses, " AND ")
|
||||||
|
}
|
||||||
|
|
||||||
|
query += fmt.Sprintf(" ORDER BY %s %s", keyPathIdent, orderBy)
|
||||||
|
|
||||||
|
if opt.Limit > 0 {
|
||||||
|
query += fmt.Sprintf(" LIMIT %d", opt.Limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the query
|
||||||
|
rows, err := k.db.QueryContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
yield("", fmt.Errorf("query failed: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// Yield each key
|
||||||
|
for rows.Next() {
|
||||||
|
var keyPath string
|
||||||
|
if err := rows.Scan(&keyPath); err != nil {
|
||||||
|
yield("", fmt.Errorf("scan failed: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !yield(keyPath, nil) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
yield("", fmt.Errorf("rows error: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save a new value - returns a WriteCloser to write the value to
|
||||||
|
func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) {
|
||||||
|
if section == "" {
|
||||||
|
return nil, fmt.Errorf("section is required")
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
|
return nil, fmt.Errorf("key is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &sqlWriteCloser{
|
||||||
|
kv: k,
|
||||||
|
ctx: ctx,
|
||||||
|
section: section,
|
||||||
|
key: key,
|
||||||
|
buf: &bytes.Buffer{},
|
||||||
|
closed: false,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sqlWriteCloser implements io.WriteCloser for SQL KV Save operations
|
||||||
|
type sqlWriteCloser struct {
|
||||||
|
kv *sqlKV
|
||||||
|
ctx context.Context
|
||||||
|
section string
|
||||||
|
key string
|
||||||
|
buf *bytes.Buffer
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write implements io.Writer
|
||||||
|
func (w *sqlWriteCloser) Write(p []byte) (int, error) {
|
||||||
|
if w.closed {
|
||||||
|
return 0, fmt.Errorf("write to closed writer")
|
||||||
|
}
|
||||||
|
return w.buf.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.Closer - stores the buffered data in SQL
|
||||||
|
func (w *sqlWriteCloser) Close() error {
|
||||||
|
if w.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
|
|
||||||
|
value := w.buf.Bytes()
|
||||||
|
|
||||||
|
switch w.section {
|
||||||
|
case sectionEvents:
|
||||||
|
// Simple upsert for events section
|
||||||
|
return w.closeEvents(value)
|
||||||
|
case sectionData:
|
||||||
|
// Complex multi-table transaction for data section
|
||||||
|
return w.closeData(value)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported section: %s", w.section)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// closeEvents handles the simple upsert for the events section
|
||||||
|
func (w *sqlWriteCloser) closeEvents(value []byte) error {
|
||||||
|
tableName, err := w.kv.getTableName(w.section)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPathIdent, err := w.kv.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
valueIdent, err := w.kv.ident("value")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ph1 := w.kv.dialect.ArgPlaceholder(1)
|
||||||
|
ph2 := w.kv.dialect.ArgPlaceholder(2)
|
||||||
|
|
||||||
|
var query string
|
||||||
|
switch w.kv.dialect.DialectName() {
|
||||||
|
case "postgres":
|
||||||
|
query = fmt.Sprintf(
|
||||||
|
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s",
|
||||||
|
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
|
||||||
|
)
|
||||||
|
case "mysql":
|
||||||
|
query = fmt.Sprintf(
|
||||||
|
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON DUPLICATE KEY UPDATE %s = VALUES(%s)",
|
||||||
|
tableName, keyPathIdent, valueIdent, ph1, ph2, valueIdent, valueIdent,
|
||||||
|
)
|
||||||
|
case "sqlite":
|
||||||
|
query = fmt.Sprintf(
|
||||||
|
"INSERT INTO %s (%s, %s) VALUES (%s, %s) ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s",
|
||||||
|
tableName, keyPathIdent, valueIdent, ph1, ph2, keyPathIdent, valueIdent, valueIdent,
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = w.kv.db.ExecContext(w.ctx, query, w.key, value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("insert/update failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// closeData handles the complex multi-table transaction for the data section
|
||||||
|
func (w *sqlWriteCloser) closeData(value []byte) error {
|
||||||
|
// Parse the key to extract all fields
|
||||||
|
parsed, err := parseDataKey(w.key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate a GUID for this write
|
||||||
|
guid := uuid.New().String()
|
||||||
|
|
||||||
|
// Execute all operations in a transaction
|
||||||
|
return w.kv.db.WithTx(w.ctx, nil, func(ctx context.Context, tx db.Tx) error {
|
||||||
|
// 1. Insert/update resource_history
|
||||||
|
if err := w.upsertResourceHistory(ctx, tx, parsed, guid, value); err != nil {
|
||||||
|
return fmt.Errorf("upsert resource_history: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Handle resource table based on action
|
||||||
|
if parsed.Action == 3 { // deleted
|
||||||
|
if err := w.deleteResource(ctx, tx, parsed); err != nil {
|
||||||
|
return fmt.Errorf("delete resource: %w", err)
|
||||||
|
}
|
||||||
|
} else { // created or updated
|
||||||
|
if err := w.upsertResource(ctx, tx, parsed, guid, value); err != nil {
|
||||||
|
return fmt.Errorf("upsert resource: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Upsert resource_version table
|
||||||
|
if err := w.upsertResourceVersion(ctx, tx, parsed); err != nil {
|
||||||
|
return fmt.Errorf("upsert resource_version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// upsertResourceHistory inserts/updates a row in the resource_history table
|
||||||
|
func (w *sqlWriteCloser) upsertResourceHistory(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
|
||||||
|
// Build identifiers
|
||||||
|
tableIdent, _ := w.kv.ident("resource_history")
|
||||||
|
guidIdent, _ := w.kv.ident("guid")
|
||||||
|
groupIdent, _ := w.kv.ident("group")
|
||||||
|
resourceIdent, _ := w.kv.ident("resource")
|
||||||
|
namespaceIdent, _ := w.kv.ident("namespace")
|
||||||
|
nameIdent, _ := w.kv.ident("name")
|
||||||
|
rvIdent, _ := w.kv.ident("resource_version")
|
||||||
|
prevRVIdent, _ := w.kv.ident("previous_resource_version")
|
||||||
|
valueIdent, _ := w.kv.ident("value")
|
||||||
|
actionIdent, _ := w.kv.ident("action")
|
||||||
|
folderIdent, _ := w.kv.ident("folder")
|
||||||
|
keyPathIdent, _ := w.kv.ident("key_path")
|
||||||
|
|
||||||
|
// Build placeholders
|
||||||
|
var query string
|
||||||
|
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||||
|
|
||||||
|
switch w.kv.dialect.DialectName() {
|
||||||
|
case "postgres":
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)
|
||||||
|
ON CONFLICT (%s) DO UPDATE SET %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
|
||||||
|
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
|
||||||
|
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
|
||||||
|
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
|
||||||
|
guidIdent, valueIdent, valueIdent, keyPathIdent, keyPathIdent,
|
||||||
|
)
|
||||||
|
case "mysql", "sqlite":
|
||||||
|
// For MySQL and SQLite, use INSERT OR REPLACE (requires all columns)
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)`,
|
||||||
|
tableIdent, guidIdent, keyPathIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent,
|
||||||
|
rvIdent, prevRVIdent, valueIdent, actionIdent, folderIdent,
|
||||||
|
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8), ph(9), ph(10),
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := tx.ExecContext(ctx, query,
|
||||||
|
guid, w.key, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
|
||||||
|
parsed.ResourceVersion, value, parsed.Action, parsed.Folder,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// upsertResource inserts/updates a row in the resource table
|
||||||
|
func (w *sqlWriteCloser) upsertResource(ctx context.Context, tx db.Tx, parsed *parsedKey, guid string, value []byte) error {
|
||||||
|
// Build identifiers
|
||||||
|
tableIdent, _ := w.kv.ident("resource")
|
||||||
|
guidIdent, _ := w.kv.ident("guid")
|
||||||
|
groupIdent, _ := w.kv.ident("group")
|
||||||
|
resourceIdent, _ := w.kv.ident("resource")
|
||||||
|
namespaceIdent, _ := w.kv.ident("namespace")
|
||||||
|
nameIdent, _ := w.kv.ident("name")
|
||||||
|
rvIdent, _ := w.kv.ident("resource_version")
|
||||||
|
valueIdent, _ := w.kv.ident("value")
|
||||||
|
actionIdent, _ := w.kv.ident("action")
|
||||||
|
|
||||||
|
var query string
|
||||||
|
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||||
|
|
||||||
|
switch w.kv.dialect.DialectName() {
|
||||||
|
case "postgres":
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (%s, %s, %s, %s) DO UPDATE SET
|
||||||
|
%s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s, %s = EXCLUDED.%s`,
|
||||||
|
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
|
||||||
|
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
|
||||||
|
namespaceIdent, groupIdent, resourceIdent, nameIdent,
|
||||||
|
guidIdent, guidIdent, rvIdent, rvIdent, valueIdent, valueIdent, actionIdent, actionIdent,
|
||||||
|
)
|
||||||
|
case "mysql", "sqlite":
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
REPLACE INTO %s (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)`,
|
||||||
|
tableIdent, guidIdent, groupIdent, resourceIdent, namespaceIdent, nameIdent, rvIdent, valueIdent, actionIdent,
|
||||||
|
ph(1), ph(2), ph(3), ph(4), ph(5), ph(6), ph(7), ph(8),
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := tx.ExecContext(ctx, query,
|
||||||
|
guid, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name,
|
||||||
|
parsed.ResourceVersion, value, parsed.Action,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteResource deletes a row from the resource table
|
||||||
|
func (w *sqlWriteCloser) deleteResource(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
|
||||||
|
tableIdent, _ := w.kv.ident("resource")
|
||||||
|
groupIdent, _ := w.kv.ident("group")
|
||||||
|
resourceIdent, _ := w.kv.ident("resource")
|
||||||
|
namespaceIdent, _ := w.kv.ident("namespace")
|
||||||
|
nameIdent, _ := w.kv.ident("name")
|
||||||
|
|
||||||
|
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
DELETE FROM %s WHERE %s = %s AND %s = %s AND %s = %s AND %s = %s`,
|
||||||
|
tableIdent, groupIdent, ph(1), resourceIdent, ph(2), namespaceIdent, ph(3), nameIdent, ph(4),
|
||||||
|
)
|
||||||
|
|
||||||
|
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.Namespace, parsed.Name)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// upsertResourceVersion inserts/updates the resource_version table
|
||||||
|
func (w *sqlWriteCloser) upsertResourceVersion(ctx context.Context, tx db.Tx, parsed *parsedKey) error {
|
||||||
|
tableIdent, _ := w.kv.ident("resource_version")
|
||||||
|
groupIdent, _ := w.kv.ident("group")
|
||||||
|
resourceIdent, _ := w.kv.ident("resource")
|
||||||
|
rvIdent, _ := w.kv.ident("resource_version")
|
||||||
|
|
||||||
|
ph := func(n int) string { return w.kv.dialect.ArgPlaceholder(n) }
|
||||||
|
|
||||||
|
var query string
|
||||||
|
switch w.kv.dialect.DialectName() {
|
||||||
|
case "postgres":
|
||||||
|
// Only update if new resource_version is greater than existing
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||||
|
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
|
||||||
|
WHERE EXCLUDED.%s > %s.%s`,
|
||||||
|
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||||
|
groupIdent, resourceIdent, rvIdent, rvIdent,
|
||||||
|
rvIdent, tableIdent, rvIdent,
|
||||||
|
)
|
||||||
|
case "sqlite":
|
||||||
|
// SQLite supports WHERE clause in ON CONFLICT DO UPDATE
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||||
|
ON CONFLICT (%s, %s) DO UPDATE SET %s = EXCLUDED.%s
|
||||||
|
WHERE EXCLUDED.%s > %s.%s`,
|
||||||
|
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||||
|
groupIdent, resourceIdent, rvIdent, rvIdent,
|
||||||
|
rvIdent, tableIdent, rvIdent,
|
||||||
|
)
|
||||||
|
case "mysql":
|
||||||
|
// MySQL uses ON DUPLICATE KEY UPDATE with conditional
|
||||||
|
query = fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, %s)
|
||||||
|
ON DUPLICATE KEY UPDATE %s = IF(VALUES(%s) > %s, VALUES(%s), %s)`,
|
||||||
|
tableIdent, groupIdent, resourceIdent, rvIdent, ph(1), ph(2), ph(3),
|
||||||
|
rvIdent, rvIdent, rvIdent, rvIdent, rvIdent,
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported dialect: %s", w.kv.dialect.DialectName())
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := tx.ExecContext(ctx, query, parsed.Group, parsed.Resource, parsed.ResourceVersion)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete a value
|
||||||
|
func (k *sqlKV) Delete(ctx context.Context, section string, key string) error {
|
||||||
|
if section == "" {
|
||||||
|
return fmt.Errorf("section is required")
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
|
return fmt.Errorf("key is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
tableName, err := k.getTableName(section)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPathIdent, err := k.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// First check if key exists (to return ErrNotFound if missing)
|
||||||
|
checkQuery := fmt.Sprintf(
|
||||||
|
"SELECT 1 FROM %s WHERE %s = %s",
|
||||||
|
tableName,
|
||||||
|
keyPathIdent,
|
||||||
|
k.dialect.ArgPlaceholder(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
var exists int
|
||||||
|
err = k.db.QueryRowContext(ctx, checkQuery, key).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return ErrNotFound
|
||||||
|
}
|
||||||
|
return fmt.Errorf("check existence failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the key
|
||||||
|
deleteQuery := fmt.Sprintf(
|
||||||
|
"DELETE FROM %s WHERE %s = %s",
|
||||||
|
tableName,
|
||||||
|
keyPathIdent,
|
||||||
|
k.dialect.ArgPlaceholder(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
_, err = k.db.ExecContext(ctx, deleteQuery, key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchDelete removes multiple keys from the store
|
||||||
|
func (k *sqlKV) BatchDelete(ctx context.Context, section string, keys []string) error {
|
||||||
|
if section == "" {
|
||||||
|
return fmt.Errorf("section is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return nil // Nothing to delete
|
||||||
|
}
|
||||||
|
|
||||||
|
tableName, err := k.getTableName(section)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
keyPathIdent, err := k.ident("key_path")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid column identifier: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build IN clause placeholders
|
||||||
|
placeholders := make([]string, len(keys))
|
||||||
|
args := make([]interface{}, len(keys))
|
||||||
|
for i, key := range keys {
|
||||||
|
placeholders[i] = k.dialect.ArgPlaceholder(i + 1)
|
||||||
|
args[i] = key
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the query
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"DELETE FROM %s WHERE %s IN (%s)",
|
||||||
|
tableName,
|
||||||
|
keyPathIdent,
|
||||||
|
strings.Join(placeholders, ", "),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Execute the query (idempotent - non-existent keys are silently ignored)
|
||||||
|
_, err = k.db.ExecContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("batch delete failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnixTimestamp returns the current time in seconds since Epoch
|
||||||
|
func (k *sqlKV) UnixTimestamp(ctx context.Context) (int64, error) {
|
||||||
|
return time.Now().Unix(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping checks if the database connection is alive
|
||||||
|
func (k *sqlKV) Ping(ctx context.Context) error {
|
||||||
|
if k.db == nil {
|
||||||
|
return fmt.Errorf("database connection is nil")
|
||||||
|
}
|
||||||
|
return k.db.PingContext(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkDB verifies the database connection is still valid before operations
|
||||||
|
func (k *sqlKV) checkDB() error {
|
||||||
|
if k.db == nil {
|
||||||
|
return fmt.Errorf("database connection is nil")
|
||||||
|
}
|
||||||
|
// Quick ping to verify connection is alive
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := k.db.PingContext(ctx); err != nil {
|
||||||
|
return fmt.Errorf("database connection is not healthy: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -66,9 +66,15 @@ type kvStorageBackend struct {
|
|||||||
withExperimentalClusterScope bool
|
withExperimentalClusterScope bool
|
||||||
//tracer trace.Tracer
|
//tracer trace.Tracer
|
||||||
//reg prometheus.Registerer
|
//reg prometheus.Registerer
|
||||||
|
|
||||||
|
// lifecycle management
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ StorageBackend = &kvStorageBackend{}
|
var _ StorageBackend = &kvStorageBackend{}
|
||||||
|
var _ LifecycleHooks = &kvStorageBackend{}
|
||||||
|
var _ resourcepb.DiagnosticsServer = &kvStorageBackend{}
|
||||||
|
|
||||||
type KVBackendOptions struct {
|
type KVBackendOptions struct {
|
||||||
KvStore KV
|
KvStore KV
|
||||||
@@ -81,7 +87,6 @@ type KVBackendOptions struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||||
ctx := context.Background()
|
|
||||||
kv := opts.KvStore
|
kv := opts.KvStore
|
||||||
|
|
||||||
s, err := snowflake.NewNode(rand.Int64N(1024))
|
s, err := snowflake.NewNode(rand.Int64N(1024))
|
||||||
@@ -100,6 +105,9 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
|||||||
eventPruningInterval = defaultEventPruningInterval
|
eventPruningInterval = defaultEventPruningInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a cancellable context for lifecycle management
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
backend := &kvStorageBackend{
|
backend := &kvStorageBackend{
|
||||||
kv: kv,
|
kv: kv,
|
||||||
dataStore: newDataStore(kv),
|
dataStore: newDataStore(kv),
|
||||||
@@ -111,18 +119,58 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
|||||||
eventRetentionPeriod: eventRetentionPeriod,
|
eventRetentionPeriod: eventRetentionPeriod,
|
||||||
eventPruningInterval: eventPruningInterval,
|
eventPruningInterval: eventPruningInterval,
|
||||||
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
|
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
err = backend.initPruner(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to initialize pruner: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the event cleanup background job
|
|
||||||
go backend.runCleanupOldEvents(ctx)
|
|
||||||
|
|
||||||
return backend, nil
|
return backend, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Init implements LifecycleHooks
|
||||||
|
func (k *kvStorageBackend) Init(ctx context.Context) error {
|
||||||
|
// Initialize the pruner
|
||||||
|
if err := k.initPruner(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize pruner: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the event cleanup background job using the backend's lifecycle context
|
||||||
|
go k.runCleanupOldEvents(k.ctx)
|
||||||
|
|
||||||
|
// Start the pruner if it was configured
|
||||||
|
if k.historyPruner != nil {
|
||||||
|
k.historyPruner.Start(k.ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop implements LifecycleHooks
|
||||||
|
func (k *kvStorageBackend) Stop(ctx context.Context) error {
|
||||||
|
// Cancel the context to stop background goroutines
|
||||||
|
// This will stop both runCleanupOldEvents and the pruner (via debouncer)
|
||||||
|
k.cancel()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsHealthy implements DiagnosticsServer
|
||||||
|
func (k *kvStorageBackend) IsHealthy(ctx context.Context, _ *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||||
|
// Check if the underlying KV store supports Ping (e.g., sqlKV)
|
||||||
|
type pinger interface {
|
||||||
|
Ping(context.Context) error
|
||||||
|
}
|
||||||
|
if p, ok := k.kv.(pinger); ok {
|
||||||
|
if err := p.Ping(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("KV store health check failed: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &resourcepb.HealthCheckResponse{Status: resourcepb.HealthCheckResponse_SERVING}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements DiagnosticsServer
|
||||||
|
func (k *kvStorageBackend) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||||||
|
return nil, ErrNotImplementedYet
|
||||||
|
}
|
||||||
|
|
||||||
// runCleanupOldEvents starts a background goroutine that periodically cleans up old events
|
// runCleanupOldEvents starts a background goroutine that periodically cleans up old events
|
||||||
func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) {
|
func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) {
|
||||||
// Run cleanup every hour
|
// Run cleanup every hour
|
||||||
@@ -217,7 +265,6 @@ func (k *kvStorageBackend) initPruner(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
k.historyPruner = pruner
|
k.historyPruner = pruner
|
||||||
k.historyPruner.Start(ctx)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,45 @@
|
|||||||
UPDATE {{ .Ident "resource_history" }}
|
UPDATE {{ .Ident "resource_history" }}
|
||||||
SET {{ .Ident "resource_version" }} = (
|
SET
|
||||||
|
{{ .Ident "resource_version" }} = (
|
||||||
CASE
|
CASE
|
||||||
{{ range $guid, $rv := .GUIDToRV }}
|
{{ range $guid, $rv := .GUIDToRV }}
|
||||||
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
|
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN CAST({{ $.Arg $rv }} AS {{ if eq $.DialectName "postgres" }}BIGINT{{ else }}SIGNED{{ end }})
|
||||||
{{ end }}
|
{{ end }}
|
||||||
END
|
END
|
||||||
)
|
),
|
||||||
|
{{ .Ident "key_path" }} = {{ if eq .DialectName "sqlite" -}}
|
||||||
|
{{ .Ident "group" }} || CHAR(47) || {{ .Ident "resource" }} || CHAR(47) || {{ .Ident "namespace" }} || CHAR(47) || {{ .Ident "name" }} || CHAR(47) ||
|
||||||
|
CAST((CASE
|
||||||
|
{{- range $guid, $rv := .GUIDToRV }}
|
||||||
|
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} / 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} % 1000)
|
||||||
|
{{- end }}
|
||||||
|
END) AS TEXT) || CHAR(126) ||
|
||||||
|
CASE {{ .Ident "action" }}
|
||||||
|
WHEN 1 THEN 'created'
|
||||||
|
WHEN 2 THEN 'updated'
|
||||||
|
WHEN 3 THEN 'deleted'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END || CHAR(126) || COALESCE({{ .Ident "folder" }}, '')
|
||||||
|
{{- else -}}
|
||||||
|
CONCAT(
|
||||||
|
{{ .Ident "group" }}, CHAR(47),
|
||||||
|
{{ .Ident "resource" }}, CHAR(47),
|
||||||
|
{{ .Ident "namespace" }}, CHAR(47),
|
||||||
|
{{ .Ident "name" }}, CHAR(47),
|
||||||
|
CAST((CASE
|
||||||
|
{{- range $guid, $rv := .GUIDToRV }}
|
||||||
|
WHEN {{ $.Ident "guid" }} = {{ $.Arg $guid }} THEN ((({{ $.Arg $rv }} DIV 1000) - 1288834974657) * 4194304) + ({{ $.Arg $rv }} MOD 1000)
|
||||||
|
{{- end }}
|
||||||
|
END) AS {{ if eq .DialectName "postgres" }}TEXT{{ else }}CHAR{{ end }}), CHAR(126),
|
||||||
|
CASE {{ .Ident "action" }}
|
||||||
|
WHEN 1 THEN 'created'
|
||||||
|
WHEN 2 THEN 'updated'
|
||||||
|
WHEN 3 THEN 'deleted'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END, CHAR(126),
|
||||||
|
COALESCE({{ .Ident "folder" }}, '')
|
||||||
|
)
|
||||||
|
{{- end }}
|
||||||
WHERE {{ .Ident "guid" }} IN (
|
WHERE {{ .Ident "guid" }} IN (
|
||||||
{{$first := true}}
|
{{$first := true}}
|
||||||
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}
|
{{ range $guid, $rv := .GUIDToRV }}{{if $first}}{{$first = false}}{{else}}, {{end}}{{ $.Arg $guid }}{{ end }}
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package migrations
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
func initResourceTables(mg *migrator.Migrator) string {
|
func initResourceTables(mg *migrator.Migrator) string {
|
||||||
@@ -185,5 +187,170 @@ func initResourceTables(mg *migrator.Migrator) string {
|
|||||||
Name: "UQE_resource_last_import_time_last_import_time",
|
Name: "UQE_resource_last_import_time_last_import_time",
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
// TODO: Do we want the value to be MEDIUMTEXT ?
|
||||||
|
// TODO: What's the best name for the key_path column?
|
||||||
|
|
||||||
|
// Add key_path column to resource_history for KV interface
|
||||||
|
mg.AddMigration("Add key_path column to resource_history", migrator.NewAddColumnMigration(resource_history_table, &migrator.Column{
|
||||||
|
Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: true,
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Backfill key_path column in resource_history
|
||||||
|
mg.AddMigration("Backfill key_path column in resource_history", &resourceHistoryKeyBackfillMigrator{})
|
||||||
|
|
||||||
|
// Note: key_path remains nullable because the write pattern is:
|
||||||
|
// 1. INSERT (key_path = NULL)
|
||||||
|
// 2. UPDATE (key_path = actual value after RV allocation)
|
||||||
|
|
||||||
|
// Add index on key_path column
|
||||||
|
mg.AddMigration("Add index on key_path column in resource_history", migrator.NewAddIndexMigration(resource_history_table, &migrator.Index{
|
||||||
|
Name: "IDX_resource_history_key_path",
|
||||||
|
Cols: []string{"key_path"},
|
||||||
|
Type: migrator.IndexType,
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Create resource_events table for KV interface
|
||||||
|
resource_events_table := migrator.Table{
|
||||||
|
Name: "resource_events",
|
||||||
|
Columns: []*migrator.Column{
|
||||||
|
{Name: "key_path", Type: migrator.DB_NVarchar, Length: 2048, Nullable: false, IsPrimaryKey: true},
|
||||||
|
{Name: "value", Type: migrator.DB_MediumText, Nullable: false},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
mg.AddMigration("create table "+resource_events_table.Name, migrator.NewAddTableMigration(resource_events_table))
|
||||||
|
|
||||||
return marker
|
return marker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resourceHistoryKeyBackfillMigrator backfills the key_path column in resource_history table
|
||||||
|
// It processes rows in batches to reduce lock duration and avoid timeouts on large tables
|
||||||
|
type resourceHistoryKeyBackfillMigrator struct {
|
||||||
|
migrator.MigrationBase
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *resourceHistoryKeyBackfillMigrator) SQL(dialect migrator.Dialect) string {
|
||||||
|
return "Backfill key_path column in resource_history using pattern: {Group}/{Resource}/{Namespace}/{Name}/{ResourceVersion}~{Action}~{Folder}"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *resourceHistoryKeyBackfillMigrator) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||||
|
dialect := mg.Dialect.DriverName()
|
||||||
|
logger := log.New("resource-history-key-backfill")
|
||||||
|
|
||||||
|
// TODO: Verify the RV to Snowflake ID conversion is correct.
|
||||||
|
|
||||||
|
// Snowflake ID epoch in milliseconds (2010-11-04T01:42:54.657Z)
|
||||||
|
const epochMs = 1288834974657
|
||||||
|
const batchSize = 1000 // Process 1000 rows at a time
|
||||||
|
|
||||||
|
// Count total rows to backfill
|
||||||
|
totalCount, err := sess.Table("resource_history").Where("key_path IS NULL").Count()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to count rows: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalCount == 0 {
|
||||||
|
logger.Info("No rows to backfill")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Starting key_path backfill", "total_rows", totalCount)
|
||||||
|
|
||||||
|
// Build the SQL query based on the database dialect
|
||||||
|
var updateSQL string
|
||||||
|
|
||||||
|
switch dialect {
|
||||||
|
case "mysql":
|
||||||
|
updateSQL = `
|
||||||
|
UPDATE resource_history
|
||||||
|
SET key_path = CONCAT(
|
||||||
|
` + "`group`" + `, '/',
|
||||||
|
` + "`resource`" + `, '/',
|
||||||
|
` + "`namespace`" + `, '/',
|
||||||
|
` + "`name`" + `, '/',
|
||||||
|
CAST((((` + "`resource_version`" + ` DIV 1000) - ?) * 4194304) + (` + "`resource_version`" + ` MOD 1000) AS CHAR), '~',
|
||||||
|
CASE ` + "`action`" + `
|
||||||
|
WHEN 1 THEN 'created'
|
||||||
|
WHEN 2 THEN 'updated'
|
||||||
|
WHEN 3 THEN 'deleted'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END, '~',
|
||||||
|
COALESCE(` + "`folder`" + `, '')
|
||||||
|
)
|
||||||
|
WHERE key_path IS NULL
|
||||||
|
LIMIT ?
|
||||||
|
`
|
||||||
|
case "postgres":
|
||||||
|
updateSQL = `
|
||||||
|
UPDATE resource_history
|
||||||
|
SET key_path = CONCAT(
|
||||||
|
"group", '/',
|
||||||
|
"resource", '/',
|
||||||
|
"namespace", '/',
|
||||||
|
"name", '/',
|
||||||
|
CAST((((resource_version / 1000) - $1) * 4194304) + (resource_version % 1000) AS BIGINT), '~',
|
||||||
|
CASE "action"
|
||||||
|
WHEN 1 THEN 'created'
|
||||||
|
WHEN 2 THEN 'updated'
|
||||||
|
WHEN 3 THEN 'deleted'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END, '~',
|
||||||
|
COALESCE("folder", '')
|
||||||
|
)
|
||||||
|
WHERE guid IN (
|
||||||
|
SELECT guid FROM resource_history
|
||||||
|
WHERE key_path IS NULL
|
||||||
|
LIMIT $2
|
||||||
|
)
|
||||||
|
`
|
||||||
|
case "sqlite3":
|
||||||
|
updateSQL = `
|
||||||
|
UPDATE resource_history
|
||||||
|
SET key_path =
|
||||||
|
"group" || '/' ||
|
||||||
|
resource || '/' ||
|
||||||
|
namespace || '/' ||
|
||||||
|
name || '/' ||
|
||||||
|
CAST((((resource_version / 1000) - ?) * 4194304) + (resource_version % 1000) AS TEXT) || '~' ||
|
||||||
|
CASE action
|
||||||
|
WHEN 1 THEN 'created'
|
||||||
|
WHEN 2 THEN 'updated'
|
||||||
|
WHEN 3 THEN 'deleted'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END || '~' ||
|
||||||
|
COALESCE(folder, '')
|
||||||
|
WHERE guid IN (
|
||||||
|
SELECT guid FROM resource_history
|
||||||
|
WHERE key_path IS NULL
|
||||||
|
LIMIT ?
|
||||||
|
)
|
||||||
|
`
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported database dialect: %s", dialect)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process in batches
|
||||||
|
processed := int64(0)
|
||||||
|
for {
|
||||||
|
result, err := sess.Exec(updateSQL, epochMs, batchSize)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update batch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsAffected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
processed += rowsAffected
|
||||||
|
logger.Info("Backfill progress", "processed", processed, "total", totalCount,
|
||||||
|
"percent", fmt.Sprintf("%.1f%%", float64(processed)/float64(totalCount)*100))
|
||||||
|
|
||||||
|
// If we updated fewer rows than batch size, we're done
|
||||||
|
if rowsAffected < int64(batchSize) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Backfill completed", "total_processed", processed)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -101,21 +102,46 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||||
withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
|
withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
|
||||||
|
|
||||||
backend, err := NewBackend(BackendOptions{
|
// Check if KV backend is enabled via feature flag
|
||||||
DBProvider: eDB,
|
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||||
Tracer: opts.Tracer,
|
if opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageKVBackend) {
|
||||||
Reg: opts.Reg,
|
// Create SQL KV instance
|
||||||
IsHA: isHA,
|
sqlKV, err := resource.NewSQLKV(eDB)
|
||||||
withPruner: withPruner,
|
if err != nil {
|
||||||
storageMetrics: opts.StorageMetrics,
|
return nil, fmt.Errorf("create SQL KV: %w", err)
|
||||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
}
|
||||||
})
|
|
||||||
if err != nil {
|
// Use existing KV storage backend (already implements StorageBackend interface)
|
||||||
return nil, err
|
kvBackend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
|
||||||
|
KvStore: sqlKV,
|
||||||
|
WithPruner: withPruner,
|
||||||
|
Tracer: opts.Tracer,
|
||||||
|
Reg: opts.Reg,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create KV backend: %w", err)
|
||||||
|
}
|
||||||
|
serverOptions.Backend = kvBackend
|
||||||
|
serverOptions.Lifecycle = kvBackend.(resource.LifecycleHooks)
|
||||||
|
serverOptions.Diagnostics = kvBackend.(resourcepb.DiagnosticsServer)
|
||||||
|
} else {
|
||||||
|
// Use existing SQL backend
|
||||||
|
backend, err := NewBackend(BackendOptions{
|
||||||
|
DBProvider: eDB,
|
||||||
|
Tracer: opts.Tracer,
|
||||||
|
Reg: opts.Reg,
|
||||||
|
IsHA: isHA,
|
||||||
|
withPruner: withPruner,
|
||||||
|
storageMetrics: opts.StorageMetrics,
|
||||||
|
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
serverOptions.Backend = backend
|
||||||
|
serverOptions.Diagnostics = backend
|
||||||
|
serverOptions.Lifecycle = backend
|
||||||
}
|
}
|
||||||
serverOptions.Backend = backend
|
|
||||||
serverOptions.Diagnostics = backend
|
|
||||||
serverOptions.Lifecycle = backend
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOptions.Search = opts.SearchOptions
|
serverOptions.Search = opts.SearchOptions
|
||||||
|
|||||||
Reference in New Issue
Block a user