Compare commits

...

7 Commits

Author SHA1 Message Date
Rafael Paulovic
848041d894 pass resource db provider 2025-11-27 19:24:52 +01:00
Rafael Paulovic
c007438e90 revert changes to newResourceDBProvider 2025-11-27 14:29:46 +01:00
Rafael Paulovic
97804e3a66 Merge remote-tracking branch 'origin/main' into fix-sqlite-data-migration-pt2 2025-11-27 13:37:35 +01:00
Rafael Paulovic
58265aad2e fix: unified SQLite migration with SQLStore migrator 2025-11-27 12:59:33 +01:00
Rafael Paulovic
b16c102c46 chore: refactor test cases into interface 2025-11-27 12:34:55 +01:00
Rafael Paulovic
79f812ddb5 chore: add comment and adjust db path name 2025-11-26 11:21:50 +01:00
Rafael Paulovic
7063df0336 feat: unified storage migrations integration tests 2025-11-25 20:16:32 +01:00
14 changed files with 461 additions and 223 deletions

View File

@@ -15,7 +15,6 @@ import (
_ "github.com/blugelabs/bluge"
_ "github.com/blugelabs/bluge_segment_api"
_ "github.com/crewjam/saml"
_ "github.com/docker/go-connections/nat"
_ "github.com/go-jose/go-jose/v4"
_ "github.com/gobwas/glob"
_ "github.com/googleapis/gax-go/v2"
@@ -31,7 +30,6 @@ import (
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
_ "github.com/spyzhov/ajson"
_ "github.com/stretchr/testify/require"
_ "github.com/testcontainers/testcontainers-go"
_ "gocloud.dev/secrets/awskms"
_ "gocloud.dev/secrets/azurekeyvault"
_ "gocloud.dev/secrets/gcpkms"
@@ -56,7 +54,9 @@ import (
_ "github.com/grafana/e2e"
_ "github.com/grafana/gofpdf"
_ "github.com/grafana/gomemcache/memcache"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/testcontainers/testcontainers-go"
)

View File

@@ -146,6 +146,7 @@ var wireExtsBasicSet = wire.NewSet(
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
wire.Struct(new(unified.Options), "*"),
unified.ProvideUnifiedStorageClient,
wire.Bind(new(resource.ResourceClient), new(*unified.ResourceClient)),
sql.ProvideStorageBackend,
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
aggregatorrunner.ProvideNoopAggregatorConfigurator,

View File

@@ -33,7 +33,10 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/util/scheduler"
"github.com/grafana/grafana/pkg/util/xorm"
)
type Options struct {
@@ -47,6 +50,21 @@ type Options struct {
SecureValues secrets.InlineSecureValueSupport
}
// ResourceClient wraps a ResourceClient
type ResourceClient struct {
resource.ResourceClient
engineProvider db.EngineProvider
}
// GetEngine returns the underlying xorm.Engine of the underlying resource server
// Returns nil for gRPC or file-based storage modes.
func (c *ResourceClient) GetEngine() *xorm.Engine {
if c.engineProvider != nil {
return c.engineProvider.GetEngine()
}
return nil
}
type clientMetrics struct {
requestDuration *prometheus.HistogramVec
requestRetries *prometheus.CounterVec
@@ -56,9 +74,9 @@ type clientMetrics struct {
func ProvideUnifiedStorageClient(opts *Options,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
) (resource.ResourceClient, error) {
) (*ResourceClient, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
client, err := newClient(options.StorageOptions{
client, engineProvider, err := newClient(options.StorageOptions{
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
Address: apiserverCfg.Key("address").MustString(""),
@@ -67,34 +85,39 @@ func ProvideUnifiedStorageClient(opts *Options,
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
if err == nil {
// Decide whether to disable SQL fallback stats per resource in Mode 5.
// Otherwise we would still try to query the legacy SQL database in Mode 5.
var disableDashboardsFallback, disableFoldersFallback bool
if opts.Cfg != nil {
// String are static here, so we don't need to import the packages.
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
disableFoldersFallback = foldersMode == grafanarest.Mode5
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
}
// Used to get the folder stats
client = federated.NewFederatedClient(
client, // The original
legacysql.NewDatabaseProvider(opts.DB),
disableDashboardsFallback,
disableFoldersFallback,
)
if err != nil {
return nil, err
}
return client, err
// Decide whether to disable SQL fallback stats per resource in Mode 5.
// Otherwise we would still try to query the legacy SQL database in Mode 5.
var disableDashboardsFallback, disableFoldersFallback bool
if opts.Cfg != nil {
// String are static here, so we don't need to import the packages.
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
disableFoldersFallback = foldersMode == grafanarest.Mode5
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
}
// Used to get the folder stats
federatedClient := federated.NewFederatedClient(
client,
legacysql.NewDatabaseProvider(opts.DB),
disableDashboardsFallback,
disableFoldersFallback,
)
return &ResourceClient{
ResourceClient: federatedClient,
engineProvider: engineProvider,
}, nil
}
func newClient(opts options.StorageOptions,
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
infradb infraDB.DB,
tracer tracing.Tracer,
reg prometheus.Registerer,
authzc types.AccessClient,
@@ -102,7 +125,7 @@ func newClient(opts options.StorageOptions,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
secure secrets.InlineSecureValueSupport,
) (resource.ResourceClient, error) {
) (resource.ResourceClient, db.EngineProvider, error) {
ctx := context.Background()
switch opts.StorageType {
@@ -112,18 +135,18 @@ func newClient(opts options.StorageOptions,
}
// Create BadgerDB instance
db, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
badgerDB, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
WithLogger(nil))
if err != nil {
return nil, err
return nil, nil, err
}
kv := resource.NewBadgerKV(db)
kv := resource.NewBadgerKV(badgerDB)
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
KvStore: kv,
})
if err != nil {
return nil, err
return nil, nil, err
}
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
@@ -133,13 +156,13 @@ func newClient(opts options.StorageOptions,
},
})
if err != nil {
return nil, err
return nil, nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server), nil, nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
return nil, nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
}
var (
@@ -151,30 +174,44 @@ func newClient(opts options.StorageOptions,
conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil {
return nil, err
return nil, nil, err
}
if opts.SearchServerAddress != "" {
indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
indexConn = conn
}
// Create a resource client
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
return client, nil, err
default:
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil)
if err != nil {
return nil, err
return nil, nil, err
}
// Create DBProvider for SQL storage - this will be shared with migration service
dbProvider, err := dbimpl.ProvideResourceDB(infradb, cfg, tracer)
if err != nil {
return nil, nil, err
}
// The concrete type implements both DBProvider and EngineProvider
engineProvider, ok := dbProvider.(db.EngineProvider)
if !ok {
return nil, nil, fmt.Errorf("DBProvider does not implement EngineProvider")
}
serverOptions := sql.ServerOptions{
DB: db,
DB: infradb,
DBProvider: dbProvider,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
@@ -194,28 +231,28 @@ func newClient(opts options.StorageOptions,
Logger: cfg.Logger,
})
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
return nil, fmt.Errorf("failed to start queue: %w", err)
return nil, nil, fmt.Errorf("failed to start queue: %w", err)
}
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: cfg.Logger,
})
if err != nil {
return nil, fmt.Errorf("failed to create scheduler: %w", err)
return nil, nil, fmt.Errorf("failed to create scheduler: %w", err)
}
err = services.StartAndAwaitRunning(ctx, scheduler)
if err != nil {
return nil, fmt.Errorf("failed to start scheduler: %w", err)
return nil, nil, fmt.Errorf("failed to start scheduler: %w", err)
}
serverOptions.QOSQueue = queue
}
server, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
return nil, nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server), engineProvider, nil
}
}

View File

@@ -31,7 +31,7 @@ func TestUnifiedStorageClient(t *testing.T) {
resourceServer.resetCalls()
indexServer.resetCalls()
client, err := newClient(
client, _, err := newClient(
options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress,
@@ -64,7 +64,7 @@ func TestUnifiedStorageClient(t *testing.T) {
resourceServer.resetCalls()
indexServer.resetCalls()
client, err := newClient(
client, _, err := newClient(
options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress,

View File

@@ -10,6 +10,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -25,11 +26,12 @@ type Validator interface {
// ResourceMigration handles migration of specific resource types from legacy to unified storage.
type ResourceMigration struct {
migrator.MigrationBase
migrator UnifiedMigrator
resources []schema.GroupResource
migrationID string
validators []Validator // Optional: custom validation logic for this migration
log log.Logger
migrator UnifiedMigrator
resources []schema.GroupResource
migrationID string
validators []Validator // Optional: custom validation logic for this migration
log log.Logger
legacyEngine *xorm.Engine
}
// NewResourceMigration creates a new migration for the specified resources.
@@ -38,13 +40,15 @@ func NewResourceMigration(
resources []schema.GroupResource,
migrationID string,
validators []Validator,
legacyEngine *xorm.Engine,
) *ResourceMigration {
return &ResourceMigration{
migrator: migrator,
resources: resources,
migrationID: migrationID,
validators: validators,
log: log.New("storage.unified.resource_migration." + migrationID),
migrator: migrator,
resources: resources,
migrationID: migrationID,
validators: validators,
legacyEngine: legacyEngine,
log: log.New("storage.unified.resource_migration." + migrationID),
}
}
@@ -59,7 +63,7 @@ func (m *ResourceMigration) SQL(_ migrator.Dialect) string {
func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
ctx := context.Background()
orgs, err := m.getAllOrgs(sess)
orgs, err := m.getAllOrgs()
if err != nil {
m.log.Error("failed to get organizations", "error", err)
return fmt.Errorf("failed to get organizations: %w", err)
@@ -72,6 +76,17 @@ func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) erro
m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources)
if mg.Dialect.DriverName() == migrator.SQLite {
// reuse transaction in SQLite to avoid "database is locked" errors
tx, err := sess.Tx()
if err != nil {
m.log.Error("Failed to get transaction from session", "error", err)
return fmt.Errorf("failed to get transaction: %w", err)
}
ctx = resource.ContextWithTransaction(ctx, tx.Tx)
m.log.Info("Stored migrator transaction in context for bulk operations (SQLite compatibility)")
}
for _, org := range orgs {
if err := m.migrateOrg(ctx, sess, org); err != nil {
return err
@@ -107,6 +122,10 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session,
m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime))
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err)
}
if response.Error != nil {
m.log.Error("Migration reported error", "org_id", org.ID, "error", response.Error.String(), "duration", time.Since(startTime))
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, fmt.Errorf("migration error: %s", response.Error.Message))
}
// Validate the migration results
if err := m.validateMigration(migrationCtx, sess, response); err != nil {
@@ -158,9 +177,9 @@ type orgInfo struct {
}
// getAllOrgs retrieves all organizations from the database
func (m *ResourceMigration) getAllOrgs(sess *xorm.Session) ([]orgInfo, error) {
func (m *ResourceMigration) getAllOrgs() ([]orgInfo, error) {
var orgs []orgInfo
err := sess.Table("org").Cols("id", "name").Find(&orgs)
err := m.legacyEngine.Table("org").Cols("id", "name").Find(&orgs)
if err != nil {
return nil, err
}

View File

@@ -10,8 +10,10 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/migrations/contract"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util/xorm"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -21,11 +23,12 @@ var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/migrati
var logger = log.New("storage.unified.migrations")
type UnifiedStorageMigrationServiceImpl struct {
migrator UnifiedMigrator
cfg *setting.Cfg
sqlStore db.DB
kv kvstore.KVStore
client resource.ResourceClient
migrator UnifiedMigrator
cfg *setting.Cfg
sqlStore db.DB
kv kvstore.KVStore
client *unified.ResourceClient
resourceDBEngine *xorm.Engine // For SQLite, use unified storage's engine for data migrations
}
var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil)
@@ -36,14 +39,23 @@ func ProvideUnifiedStorageMigrationService(
cfg *setting.Cfg,
sqlStore db.DB,
kv kvstore.KVStore,
client resource.ResourceClient,
client *unified.ResourceClient,
) contract.UnifiedStorageMigrationService {
// Get engine from unified storage client, fallback to grafana core database engine
resourceEngine := client.GetEngine()
if resourceEngine != nil {
logger.Info("Using Resource DB for unified storage migrations")
} else {
logger.Info("Using SQL Store DB for unified storage migrations")
resourceEngine = sqlStore.GetEngine()
}
return &UnifiedStorageMigrationServiceImpl{
migrator: migrator,
cfg: cfg,
sqlStore: sqlStore,
kv: kv,
client: client,
migrator: migrator,
cfg: cfg,
sqlStore: sqlStore,
kv: kv,
client: client,
resourceDBEngine: resourceEngine,
}
}
@@ -61,7 +73,7 @@ func (p *UnifiedStorageMigrationServiceImpl) Run(ctx context.Context) error {
// TODO: Re-enable once migrations are ready
// TODO: add guarantee that this only runs once
// return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client)
//return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client, p.resourceDBEngine)
return nil
}
@@ -70,10 +82,11 @@ func RegisterMigrations(
cfg *setting.Cfg,
sqlStore db.DB,
client resource.ResourceClient,
resourceDBEngine *xorm.Engine,
) error {
ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations")
defer span.End()
mg := sqlstoremigrator.NewScopedMigrator(sqlStore.GetEngine(), cfg, "unifiedstorage")
mg := sqlstoremigrator.NewScopedMigrator(resourceDBEngine, cfg, "unifiedstorage")
mg.AddCreateMigration()
if err := prometheus.Register(mg); err != nil {
@@ -81,12 +94,17 @@ func RegisterMigrations(
}
// Register resource migrations
registerDashboardAndFolderMigration(mg, migrator, client)
registerDashboardAndFolderMigration(mg, sqlStore.GetEngine(), migrator, client)
// Run all registered migrations (blocking)
sec := cfg.Raw.Section("database")
migrationLocking := sec.Key("migration_locking").MustBool(true)
if mg.Dialect.DriverName() == sqlstoremigrator.SQLite {
// disable migration locking for SQLite to avoid "database is locked" errors in the bulk operations
migrationLocking = false
}
if err := mg.RunMigrations(ctx,
sec.Key("migration_locking").MustBool(true),
migrationLocking,
sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil {
return fmt.Errorf("unified storage data migration failed: %w", err)
}
@@ -95,15 +113,18 @@ func RegisterMigrations(
return nil
}
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) {
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, legacyEngine *xorm.Engine, migrator UnifiedMigrator, client resource.ResourceClient) {
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
driverName := mg.Dialect.DriverName()
folderCountValidator := NewCountValidator(
client,
folders,
"dashboard",
"org_id = ? and is_folder = true",
driverName,
legacyEngine,
)
dashboardCountValidator := NewCountValidator(
@@ -111,15 +132,18 @@ func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator
dashboards,
"dashboard",
"org_id = ? and is_folder = false",
driverName,
legacyEngine,
)
folderTreeValidator := NewFolderTreeValidator(client, folders)
folderTreeValidator := NewFolderTreeValidator(client, folders, driverName, legacyEngine)
dashboardsAndFolders := NewResourceMigration(
migrator,
[]schema.GroupResource{folders, dashboards},
"folders-dashboards",
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
legacyEngine,
)
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -52,11 +53,13 @@ func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupR
}
type CountValidator struct {
name string
client resourcepb.ResourceIndexClient
resource schema.GroupResource
table string
whereClause string
name string
client resourcepb.ResourceIndexClient
resource schema.GroupResource
table string
whereClause string
driverName string
legacyEngine *xorm.Engine
}
func NewCountValidator(
@@ -64,13 +67,17 @@ func NewCountValidator(
resource schema.GroupResource,
table string,
whereClause string,
driverName string,
legacy *xorm.Engine,
) Validator {
return &CountValidator{
name: "CountValidator",
client: client,
resource: resource,
table: table,
whereClause: whereClause,
name: "CountValidator",
client: client,
resource: resource,
table: table,
whereClause: whereClause,
driverName: driverName,
legacyEngine: legacy,
}
}
@@ -115,27 +122,37 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
}
legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count()
legacyCount, err := v.legacyEngine.Table(v.table).Where(v.whereClause, orgID).Count()
if err != nil {
return fmt.Errorf("failed to count %s: %w", v.table, err)
}
// Get unified storage count using GetStats API
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
Namespace: summary.Namespace,
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
})
if err != nil {
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
// Find the count for this specific resource type
var unifiedCount int64
for _, stat := range statsResp.Stats {
if stat.Group == summary.Group && stat.Resource == summary.Resource {
unifiedCount = stat.Count
break
if v.driverName == migrator.SQLite {
unifiedCount, err = sess.Table("resource").
Where("namespace = ? AND `group` = ? AND resource = ?",
summary.Namespace, summary.Group, summary.Resource).
Count()
if err != nil {
return fmt.Errorf("failed to count resource table for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
} else {
// Get unified storage count using GetStats API
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
Namespace: summary.Namespace,
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
})
if err != nil {
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
// Find the count for this specific resource type
for _, stat := range statsResp.Stats {
if stat.Group == summary.Group && stat.Resource == summary.Resource {
unifiedCount = stat.Count
break
}
}
}
@@ -162,19 +179,25 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
}
type FolderTreeValidator struct {
name string
client resourcepb.ResourceIndexClient
resource schema.GroupResource
name string
client resourcepb.ResourceIndexClient
resource schema.GroupResource
driverName string
legacyEngine *xorm.Engine
}
func NewFolderTreeValidator(
client resourcepb.ResourceIndexClient,
resource schema.GroupResource,
driverName string,
legacyEngine *xorm.Engine,
) Validator {
return &FolderTreeValidator{
name: "FolderTreeValidator",
client: client,
resource: resource,
name: "FolderTreeValidator",
client: client,
resource: resource,
driverName: driverName,
legacyEngine: legacyEngine,
}
}
@@ -185,6 +208,12 @@ type legacyFolder struct {
Title string `xorm:"title"`
}
type unifiedFolder struct {
GUID string `xorm:"guid"`
Name string `xorm:"name"`
Folder string `xorm:"folder"`
}
func (v *FolderTreeValidator) Name() string {
return v.name
}
@@ -212,13 +241,18 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
}
// Build legacy folder parent map
legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log)
legacyParentMap, err := v.buildLegacyFolderParentMap(orgID, log)
if err != nil {
return fmt.Errorf("failed to build legacy folder parent map: %w", err)
}
// Build unified storage folder parent map
unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
var unifiedParentMap map[string]string
if v.driverName == migrator.SQLite {
unifiedParentMap, err = v.buildUnifiedFolderParentMapSQLite(sess, summary.Namespace, log)
} else {
unifiedParentMap, err = v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
}
if err != nil {
return fmt.Errorf("failed to build unified folder parent map: %w", err)
}
@@ -270,10 +304,10 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
return nil
}
func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) {
func (v *FolderTreeValidator) buildLegacyFolderParentMap(orgID int64, log log.Logger) (map[string]string, error) {
// Query all folders for this org
var folders []legacyFolder
err := sess.Table("dashboard").
err := v.legacyEngine.Table("dashboard").
Cols("id", "uid", "folder_uid", "title").
Where("org_id = ? AND is_folder = ?", orgID, true).
Find(&folders)
@@ -348,3 +382,30 @@ func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, n
return parentMap, nil
}
func (v *FolderTreeValidator) buildUnifiedFolderParentMapSQLite(sess *xorm.Session, namespace string, log log.Logger) (map[string]string, error) {
var folders []unifiedFolder
err := sess.Table("resource").
Cols("guid", "name", "folder").
Where("namespace = ? AND resource = ?", namespace, "folder").
Find(&folders)
if err != nil {
return nil, fmt.Errorf("failed to query unified folders: %w", err)
}
parentMap := make(map[string]string)
for _, folder := range folders {
parentMap[folder.Name] = folder.Folder
}
if len(parentMap) == 0 {
log.Debug("No unified folders found for namespace", "namespace", namespace)
return make(map[string]string), nil
}
log.Debug("Built unified folder parent map",
"folder_count", len(parentMap),
"namespace", namespace)
return parentMap, nil
}

View File

@@ -0,0 +1,25 @@
package resource
import (
"context"
"database/sql"
)
type transactionContextKey struct{}
// ContextWithTransaction returns a new context with the transaction stored directly.
// This is used for SQLite migrations where the transaction needs to be shared
// between the migration code and unified storage operations within the same process.
func ContextWithTransaction(ctx context.Context, tx *sql.Tx) context.Context {
return context.WithValue(ctx, transactionContextKey{}, tx)
}
// TransactionFromContext retrieves the transaction from context
func TransactionFromContext(ctx context.Context) *sql.Tx {
if v := ctx.Value(transactionContextKey{}); v != nil {
if tx, ok := v.(*sql.Tx); ok {
return tx
}
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -20,6 +21,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
@@ -111,6 +113,19 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
}
defer b.bulkLock.Finish(setting.Collection)
// If provided, reuse the inproc transaction for SQLite
if clientCtx := inprocgrpc.ClientContext(ctx); clientCtx != nil && b.dialect.DialectName() == "sqlite" {
if externalTx := resource.TransactionFromContext(clientCtx); externalTx != nil {
b.log.Info("Using SQLite transaction from client context")
rsp := &resourcepb.BulkResponse{}
err := b.processBulkWithTx(ctx, dbimpl.NewTx(externalTx), setting, iter, rsp)
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp
}
}
// We may want to first write parquet, then read parquet
if b.dialect.DialectName() == "sqlite" {
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
@@ -151,109 +166,134 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
rsp := &resourcepb.BulkResponse{}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rollbackWithError := func(err error) error {
txerr := tx.Rollback()
if txerr != nil {
b.log.Warn("rollback", "error", txerr)
} else {
b.log.Info("rollback")
return b.processBulkWithTx(ctx, tx, setting, iter, rsp)
})
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp
}
// processBulkWithTx performs the bulk operation using the provided transaction.
// This is used both when creating our own transaction and when reusing an external one.
func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resource.BulkSettings, iter resource.BulkRequestIterator, rsp *resourcepb.BulkResponse) error {
rollbackWithError := func(err error) error {
txerr := tx.Rollback()
if txerr != nil {
b.log.Warn("rollback", "error", txerr)
} else {
b.log.Info("rollback")
}
return err
}
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
for _, key := range setting.Collection {
summary, err := bulk.deleteCollection(key)
if err != nil {
return rollbackWithError(err)
}
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err
}
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
for _, key := range setting.Collection {
summary, err := bulk.deleteCollection(key)
if err != nil {
return rollbackWithError(err)
if b.dialect.DialectName() == "sqlite" {
nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource)
if err != nil {
b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key))
} else {
b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key))
// Save the incremented RV
if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil {
b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key))
} else {
b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key))
}
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err
}
// Make sure the collection RV is above our last written event
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
return "", nil
@@ -261,19 +301,15 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
if err != nil {
b.log.Warn("error increasing RV", "error", err)
}
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
}
return nil
})
if err != nil {
rsp.Error = resource.AsErrorResult(err)
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
}
return rsp
return nil
}
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {

View File

@@ -48,6 +48,11 @@ type sqlTx struct {
*sql.Tx
}
// NewTx wraps an existing *sql.Tx with sqlTx
func NewTx(tx *sql.Tx) db.Tx {
return sqlTx{tx}
}
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
// and the parameters are passed as arguments."

View File

@@ -103,6 +103,11 @@ func (p *resourceDBProvider) Init(ctx context.Context) (db.DB, error) {
return p.resourceDB, p.initErr
}
// GetEngine returns the underlying xorm.Engine for data migrations.
func (p *resourceDBProvider) GetEngine() *xorm.Engine {
return p.engine
}
func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) {
p.log.Info("Initializing Resource DB",
"db_type",

View File

@@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"github.com/grafana/grafana/pkg/util/xorm"
)
//go:generate mockery --with-expecter --name DB
@@ -19,6 +21,13 @@ type DBProvider interface {
Init(context.Context) (DB, error)
}
// EngineProvider provides access to the underlying xorm.Engine.
type EngineProvider interface {
// GetEngine returns the underlying xorm.Engine.
// It can be used to run data migrations to unified storage.
GetEngine() *xorm.Engine
}
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
// testing. We purposefully hide database operation methods that would use
// context.Background().

View File

@@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
@@ -32,6 +33,7 @@ type QOSEnqueueDequeuer interface {
type ServerOptions struct {
Backend resource.StorageBackend
DB infraDB.DB
DBProvider db.DBProvider // If provided, use this instead of creating a new one
Cfg *setting.Cfg
Tracer trace.Tracer
Reg prometheus.Registerer
@@ -91,9 +93,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Backend = opts.Backend
// TODO: we should probably have a proper interface for diagnostics/lifecycle
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
// Use provided DBProvider if available, otherwise create a new one
eDB := opts.DBProvider
if eDB == nil {
var err error
eDB, err = dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
}
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),

View File

@@ -7,6 +7,7 @@ package xorm
import (
"context"
"database/sql"
"errors"
"fmt"
"hash/crc32"
"reflect"
@@ -43,7 +44,7 @@ type Session struct {
afterProcessors []executedProcessor
prepareStmt bool
stmtCache map[uint32]*core.Stmt //key: hash.Hash32 of (queryStr, len(queryStr))
stmtCache map[uint32]*core.Stmt // key: hash.Hash32 of (queryStr, len(queryStr))
// !evalphobia! stored the last executed query on this session
lastSQL string
@@ -236,6 +237,14 @@ func (session *Session) DB() *core.DB {
return session.db
}
// Tx returns the underlying transaction
func (session *Session) Tx() (*core.Tx, error) {
if session.tx == nil {
return nil, errors.New("no open transaction")
}
return session.tx, nil
}
func cleanupProcessorsClosures(slices *[]func(any)) {
if len(*slices) > 0 {
*slices = make([]func(any), 0)