Compare commits

...

7 Commits

Author SHA1 Message Date
Rafael Paulovic
848041d894 pass resource db provider 2025-11-27 19:24:52 +01:00
Rafael Paulovic
c007438e90 revert changes to newResourceDBProvider 2025-11-27 14:29:46 +01:00
Rafael Paulovic
97804e3a66 Merge remote-tracking branch 'origin/main' into fix-sqlite-data-migration-pt2 2025-11-27 13:37:35 +01:00
Rafael Paulovic
58265aad2e fix: unified SQLite migration with SQLStore migrator 2025-11-27 12:59:33 +01:00
Rafael Paulovic
b16c102c46 chore: refactor test cases into interface 2025-11-27 12:34:55 +01:00
Rafael Paulovic
79f812ddb5 chore: add comment and adjust db path name 2025-11-26 11:21:50 +01:00
Rafael Paulovic
7063df0336 feat: unified storage migrations integration tests 2025-11-25 20:16:32 +01:00
14 changed files with 461 additions and 223 deletions

View File

@@ -15,7 +15,6 @@ import (
_ "github.com/blugelabs/bluge" _ "github.com/blugelabs/bluge"
_ "github.com/blugelabs/bluge_segment_api" _ "github.com/blugelabs/bluge_segment_api"
_ "github.com/crewjam/saml" _ "github.com/crewjam/saml"
_ "github.com/docker/go-connections/nat"
_ "github.com/go-jose/go-jose/v4" _ "github.com/go-jose/go-jose/v4"
_ "github.com/gobwas/glob" _ "github.com/gobwas/glob"
_ "github.com/googleapis/gax-go/v2" _ "github.com/googleapis/gax-go/v2"
@@ -31,7 +30,6 @@ import (
_ "github.com/spf13/cobra" // used by the standalone apiserver cli _ "github.com/spf13/cobra" // used by the standalone apiserver cli
_ "github.com/spyzhov/ajson" _ "github.com/spyzhov/ajson"
_ "github.com/stretchr/testify/require" _ "github.com/stretchr/testify/require"
_ "github.com/testcontainers/testcontainers-go"
_ "gocloud.dev/secrets/awskms" _ "gocloud.dev/secrets/awskms"
_ "gocloud.dev/secrets/azurekeyvault" _ "gocloud.dev/secrets/azurekeyvault"
_ "gocloud.dev/secrets/gcpkms" _ "gocloud.dev/secrets/gcpkms"
@@ -56,7 +54,9 @@ import (
_ "github.com/grafana/e2e" _ "github.com/grafana/e2e"
_ "github.com/grafana/gofpdf" _ "github.com/grafana/gofpdf"
_ "github.com/grafana/gomemcache/memcache" _ "github.com/grafana/gomemcache/memcache"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1" _ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1" _ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
_ "github.com/grafana/tempo/pkg/traceql" _ "github.com/testcontainers/testcontainers-go"
) )

View File

@@ -146,6 +146,7 @@ var wireExtsBasicSet = wire.NewSet(
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)), wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
wire.Struct(new(unified.Options), "*"), wire.Struct(new(unified.Options), "*"),
unified.ProvideUnifiedStorageClient, unified.ProvideUnifiedStorageClient,
wire.Bind(new(resource.ResourceClient), new(*unified.ResourceClient)),
sql.ProvideStorageBackend, sql.ProvideStorageBackend,
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders, builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
aggregatorrunner.ProvideNoopAggregatorConfigurator, aggregatorrunner.ProvideNoopAggregatorConfigurator,

View File

@@ -33,7 +33,10 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/search" "github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/util/scheduler" "github.com/grafana/grafana/pkg/util/scheduler"
"github.com/grafana/grafana/pkg/util/xorm"
) )
type Options struct { type Options struct {
@@ -47,6 +50,21 @@ type Options struct {
SecureValues secrets.InlineSecureValueSupport SecureValues secrets.InlineSecureValueSupport
} }
// ResourceClient wraps a ResourceClient
type ResourceClient struct {
resource.ResourceClient
engineProvider db.EngineProvider
}
// GetEngine returns the underlying xorm.Engine of the underlying resource server
// Returns nil for gRPC or file-based storage modes.
func (c *ResourceClient) GetEngine() *xorm.Engine {
if c.engineProvider != nil {
return c.engineProvider.GetEngine()
}
return nil
}
type clientMetrics struct { type clientMetrics struct {
requestDuration *prometheus.HistogramVec requestDuration *prometheus.HistogramVec
requestRetries *prometheus.CounterVec requestRetries *prometheus.CounterVec
@@ -56,9 +74,9 @@ type clientMetrics struct {
func ProvideUnifiedStorageClient(opts *Options, func ProvideUnifiedStorageClient(opts *Options,
storageMetrics *resource.StorageMetrics, storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics, indexMetrics *resource.BleveIndexMetrics,
) (resource.ResourceClient, error) { ) (*ResourceClient, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver") apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
client, err := newClient(options.StorageOptions{ client, engineProvider, err := newClient(options.StorageOptions{
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))), StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")), DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
Address: apiserverCfg.Key("address").MustString(""), Address: apiserverCfg.Key("address").MustString(""),
@@ -67,34 +85,39 @@ func ProvideUnifiedStorageClient(opts *Options,
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault), BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0), GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues) }, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
if err == nil { if err != nil {
// Decide whether to disable SQL fallback stats per resource in Mode 5. return nil, err
// Otherwise we would still try to query the legacy SQL database in Mode 5.
var disableDashboardsFallback, disableFoldersFallback bool
if opts.Cfg != nil {
// String are static here, so we don't need to import the packages.
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
disableFoldersFallback = foldersMode == grafanarest.Mode5
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
}
// Used to get the folder stats
client = federated.NewFederatedClient(
client, // The original
legacysql.NewDatabaseProvider(opts.DB),
disableDashboardsFallback,
disableFoldersFallback,
)
} }
return client, err // Decide whether to disable SQL fallback stats per resource in Mode 5.
// Otherwise we would still try to query the legacy SQL database in Mode 5.
var disableDashboardsFallback, disableFoldersFallback bool
if opts.Cfg != nil {
// String are static here, so we don't need to import the packages.
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
disableFoldersFallback = foldersMode == grafanarest.Mode5
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
}
// Used to get the folder stats
federatedClient := federated.NewFederatedClient(
client,
legacysql.NewDatabaseProvider(opts.DB),
disableDashboardsFallback,
disableFoldersFallback,
)
return &ResourceClient{
ResourceClient: federatedClient,
engineProvider: engineProvider,
}, nil
} }
func newClient(opts options.StorageOptions, func newClient(opts options.StorageOptions,
cfg *setting.Cfg, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, features featuremgmt.FeatureToggles,
db infraDB.DB, infradb infraDB.DB,
tracer tracing.Tracer, tracer tracing.Tracer,
reg prometheus.Registerer, reg prometheus.Registerer,
authzc types.AccessClient, authzc types.AccessClient,
@@ -102,7 +125,7 @@ func newClient(opts options.StorageOptions,
storageMetrics *resource.StorageMetrics, storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics, indexMetrics *resource.BleveIndexMetrics,
secure secrets.InlineSecureValueSupport, secure secrets.InlineSecureValueSupport,
) (resource.ResourceClient, error) { ) (resource.ResourceClient, db.EngineProvider, error) {
ctx := context.Background() ctx := context.Background()
switch opts.StorageType { switch opts.StorageType {
@@ -112,18 +135,18 @@ func newClient(opts options.StorageOptions,
} }
// Create BadgerDB instance // Create BadgerDB instance
db, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")). badgerDB, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
WithLogger(nil)) WithLogger(nil))
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
kv := resource.NewBadgerKV(db) kv := resource.NewBadgerKV(badgerDB)
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{ backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
KvStore: kv, KvStore: kv,
}) })
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
server, err := resource.NewResourceServer(resource.ResourceServerOptions{ server, err := resource.NewResourceServer(resource.ResourceServerOptions{
@@ -133,13 +156,13 @@ func newClient(opts options.StorageOptions,
}, },
}) })
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return resource.NewLocalResourceClient(server), nil return resource.NewLocalResourceClient(server), nil, nil
case options.StorageTypeUnifiedGrpc: case options.StorageTypeUnifiedGrpc:
if opts.Address == "" { if opts.Address == "" {
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType) return nil, nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
} }
var ( var (
@@ -151,30 +174,44 @@ func newClient(opts options.StorageOptions,
conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime) conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if opts.SearchServerAddress != "" { if opts.SearchServerAddress != "" {
indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime) indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
} else { } else {
indexConn = conn indexConn = conn
} }
// Create a resource client // Create a resource client
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer) client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
return client, nil, err
default: default:
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil) searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil)
if err != nil { if err != nil {
return nil, err return nil, nil, err
}
// Create DBProvider for SQL storage - this will be shared with migration service
dbProvider, err := dbimpl.ProvideResourceDB(infradb, cfg, tracer)
if err != nil {
return nil, nil, err
}
// The concrete type implements both DBProvider and EngineProvider
engineProvider, ok := dbProvider.(db.EngineProvider)
if !ok {
return nil, nil, fmt.Errorf("DBProvider does not implement EngineProvider")
} }
serverOptions := sql.ServerOptions{ serverOptions := sql.ServerOptions{
DB: db, DB: infradb,
DBProvider: dbProvider,
Cfg: cfg, Cfg: cfg,
Tracer: tracer, Tracer: tracer,
Reg: reg, Reg: reg,
@@ -194,28 +231,28 @@ func newClient(opts options.StorageOptions,
Logger: cfg.Logger, Logger: cfg.Logger,
}) })
if err := services.StartAndAwaitRunning(ctx, queue); err != nil { if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
return nil, fmt.Errorf("failed to start queue: %w", err) return nil, nil, fmt.Errorf("failed to start queue: %w", err)
} }
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{ scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker, NumWorkers: cfg.QOSNumberWorker,
Logger: cfg.Logger, Logger: cfg.Logger,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create scheduler: %w", err) return nil, nil, fmt.Errorf("failed to create scheduler: %w", err)
} }
err = services.StartAndAwaitRunning(ctx, scheduler) err = services.StartAndAwaitRunning(ctx, scheduler)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to start scheduler: %w", err) return nil, nil, fmt.Errorf("failed to start scheduler: %w", err)
} }
serverOptions.QOSQueue = queue serverOptions.QOSQueue = queue
} }
server, err := sql.NewResourceServer(serverOptions) server, err := sql.NewResourceServer(serverOptions)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return resource.NewLocalResourceClient(server), nil return resource.NewLocalResourceClient(server), engineProvider, nil
} }
} }

View File

@@ -31,7 +31,7 @@ func TestUnifiedStorageClient(t *testing.T) {
resourceServer.resetCalls() resourceServer.resetCalls()
indexServer.resetCalls() indexServer.resetCalls()
client, err := newClient( client, _, err := newClient(
options.StorageOptions{ options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc, StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress, Address: resourceServerAddress,
@@ -64,7 +64,7 @@ func TestUnifiedStorageClient(t *testing.T) {
resourceServer.resetCalls() resourceServer.resetCalls()
indexServer.resetCalls() indexServer.resetCalls()
client, err := newClient( client, _, err := newClient(
options.StorageOptions{ options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc, StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress, Address: resourceServerAddress,

View File

@@ -10,6 +10,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy" "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm" "github.com/grafana/grafana/pkg/util/xorm"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@@ -25,11 +26,12 @@ type Validator interface {
// ResourceMigration handles migration of specific resource types from legacy to unified storage. // ResourceMigration handles migration of specific resource types from legacy to unified storage.
type ResourceMigration struct { type ResourceMigration struct {
migrator.MigrationBase migrator.MigrationBase
migrator UnifiedMigrator migrator UnifiedMigrator
resources []schema.GroupResource resources []schema.GroupResource
migrationID string migrationID string
validators []Validator // Optional: custom validation logic for this migration validators []Validator // Optional: custom validation logic for this migration
log log.Logger log log.Logger
legacyEngine *xorm.Engine
} }
// NewResourceMigration creates a new migration for the specified resources. // NewResourceMigration creates a new migration for the specified resources.
@@ -38,13 +40,15 @@ func NewResourceMigration(
resources []schema.GroupResource, resources []schema.GroupResource,
migrationID string, migrationID string,
validators []Validator, validators []Validator,
legacyEngine *xorm.Engine,
) *ResourceMigration { ) *ResourceMigration {
return &ResourceMigration{ return &ResourceMigration{
migrator: migrator, migrator: migrator,
resources: resources, resources: resources,
migrationID: migrationID, migrationID: migrationID,
validators: validators, validators: validators,
log: log.New("storage.unified.resource_migration." + migrationID), legacyEngine: legacyEngine,
log: log.New("storage.unified.resource_migration." + migrationID),
} }
} }
@@ -59,7 +63,7 @@ func (m *ResourceMigration) SQL(_ migrator.Dialect) string {
func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error { func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
ctx := context.Background() ctx := context.Background()
orgs, err := m.getAllOrgs(sess) orgs, err := m.getAllOrgs()
if err != nil { if err != nil {
m.log.Error("failed to get organizations", "error", err) m.log.Error("failed to get organizations", "error", err)
return fmt.Errorf("failed to get organizations: %w", err) return fmt.Errorf("failed to get organizations: %w", err)
@@ -72,6 +76,17 @@ func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) erro
m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources) m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources)
if mg.Dialect.DriverName() == migrator.SQLite {
// reuse transaction in SQLite to avoid "database is locked" errors
tx, err := sess.Tx()
if err != nil {
m.log.Error("Failed to get transaction from session", "error", err)
return fmt.Errorf("failed to get transaction: %w", err)
}
ctx = resource.ContextWithTransaction(ctx, tx.Tx)
m.log.Info("Stored migrator transaction in context for bulk operations (SQLite compatibility)")
}
for _, org := range orgs { for _, org := range orgs {
if err := m.migrateOrg(ctx, sess, org); err != nil { if err := m.migrateOrg(ctx, sess, org); err != nil {
return err return err
@@ -107,6 +122,10 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session,
m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime)) m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime))
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err) return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err)
} }
if response.Error != nil {
m.log.Error("Migration reported error", "org_id", org.ID, "error", response.Error.String(), "duration", time.Since(startTime))
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, fmt.Errorf("migration error: %s", response.Error.Message))
}
// Validate the migration results // Validate the migration results
if err := m.validateMigration(migrationCtx, sess, response); err != nil { if err := m.validateMigration(migrationCtx, sess, response); err != nil {
@@ -158,9 +177,9 @@ type orgInfo struct {
} }
// getAllOrgs retrieves all organizations from the database // getAllOrgs retrieves all organizations from the database
func (m *ResourceMigration) getAllOrgs(sess *xorm.Session) ([]orgInfo, error) { func (m *ResourceMigration) getAllOrgs() ([]orgInfo, error) {
var orgs []orgInfo var orgs []orgInfo
err := sess.Table("org").Cols("id", "name").Find(&orgs) err := m.legacyEngine.Table("org").Cols("id", "name").Find(&orgs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -10,8 +10,10 @@ import (
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator" sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/migrations/contract" "github.com/grafana/grafana/pkg/storage/unified/migrations/contract"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util/xorm"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@@ -21,11 +23,12 @@ var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/migrati
var logger = log.New("storage.unified.migrations") var logger = log.New("storage.unified.migrations")
type UnifiedStorageMigrationServiceImpl struct { type UnifiedStorageMigrationServiceImpl struct {
migrator UnifiedMigrator migrator UnifiedMigrator
cfg *setting.Cfg cfg *setting.Cfg
sqlStore db.DB sqlStore db.DB
kv kvstore.KVStore kv kvstore.KVStore
client resource.ResourceClient client *unified.ResourceClient
resourceDBEngine *xorm.Engine // For SQLite, use unified storage's engine for data migrations
} }
var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil) var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil)
@@ -36,14 +39,23 @@ func ProvideUnifiedStorageMigrationService(
cfg *setting.Cfg, cfg *setting.Cfg,
sqlStore db.DB, sqlStore db.DB,
kv kvstore.KVStore, kv kvstore.KVStore,
client resource.ResourceClient, client *unified.ResourceClient,
) contract.UnifiedStorageMigrationService { ) contract.UnifiedStorageMigrationService {
// Get engine from unified storage client, fallback to grafana core database engine
resourceEngine := client.GetEngine()
if resourceEngine != nil {
logger.Info("Using Resource DB for unified storage migrations")
} else {
logger.Info("Using SQL Store DB for unified storage migrations")
resourceEngine = sqlStore.GetEngine()
}
return &UnifiedStorageMigrationServiceImpl{ return &UnifiedStorageMigrationServiceImpl{
migrator: migrator, migrator: migrator,
cfg: cfg, cfg: cfg,
sqlStore: sqlStore, sqlStore: sqlStore,
kv: kv, kv: kv,
client: client, client: client,
resourceDBEngine: resourceEngine,
} }
} }
@@ -61,7 +73,7 @@ func (p *UnifiedStorageMigrationServiceImpl) Run(ctx context.Context) error {
// TODO: Re-enable once migrations are ready // TODO: Re-enable once migrations are ready
// TODO: add guarantee that this only runs once // TODO: add guarantee that this only runs once
// return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client) //return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client, p.resourceDBEngine)
return nil return nil
} }
@@ -70,10 +82,11 @@ func RegisterMigrations(
cfg *setting.Cfg, cfg *setting.Cfg,
sqlStore db.DB, sqlStore db.DB,
client resource.ResourceClient, client resource.ResourceClient,
resourceDBEngine *xorm.Engine,
) error { ) error {
ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations") ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations")
defer span.End() defer span.End()
mg := sqlstoremigrator.NewScopedMigrator(sqlStore.GetEngine(), cfg, "unifiedstorage") mg := sqlstoremigrator.NewScopedMigrator(resourceDBEngine, cfg, "unifiedstorage")
mg.AddCreateMigration() mg.AddCreateMigration()
if err := prometheus.Register(mg); err != nil { if err := prometheus.Register(mg); err != nil {
@@ -81,12 +94,17 @@ func RegisterMigrations(
} }
// Register resource migrations // Register resource migrations
registerDashboardAndFolderMigration(mg, migrator, client) registerDashboardAndFolderMigration(mg, sqlStore.GetEngine(), migrator, client)
// Run all registered migrations (blocking) // Run all registered migrations (blocking)
sec := cfg.Raw.Section("database") sec := cfg.Raw.Section("database")
migrationLocking := sec.Key("migration_locking").MustBool(true)
if mg.Dialect.DriverName() == sqlstoremigrator.SQLite {
// disable migration locking for SQLite to avoid "database is locked" errors in the bulk operations
migrationLocking = false
}
if err := mg.RunMigrations(ctx, if err := mg.RunMigrations(ctx,
sec.Key("migration_locking").MustBool(true), migrationLocking,
sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil { sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil {
return fmt.Errorf("unified storage data migration failed: %w", err) return fmt.Errorf("unified storage data migration failed: %w", err)
} }
@@ -95,15 +113,18 @@ func RegisterMigrations(
return nil return nil
} }
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) { func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, legacyEngine *xorm.Engine, migrator UnifiedMigrator, client resource.ResourceClient) {
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"} folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"} dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
driverName := mg.Dialect.DriverName()
folderCountValidator := NewCountValidator( folderCountValidator := NewCountValidator(
client, client,
folders, folders,
"dashboard", "dashboard",
"org_id = ? and is_folder = true", "org_id = ? and is_folder = true",
driverName,
legacyEngine,
) )
dashboardCountValidator := NewCountValidator( dashboardCountValidator := NewCountValidator(
@@ -111,15 +132,18 @@ func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator
dashboards, dashboards,
"dashboard", "dashboard",
"org_id = ? and is_folder = false", "org_id = ? and is_folder = false",
driverName,
legacyEngine,
) )
folderTreeValidator := NewFolderTreeValidator(client, folders) folderTreeValidator := NewFolderTreeValidator(client, folders, driverName, legacyEngine)
dashboardsAndFolders := NewResourceMigration( dashboardsAndFolders := NewResourceMigration(
migrator, migrator,
[]schema.GroupResource{folders, dashboards}, []schema.GroupResource{folders, dashboards},
"folders-dashboards", "folders-dashboards",
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator}, []Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
legacyEngine,
) )
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders) mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
} }

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm" "github.com/grafana/grafana/pkg/util/xorm"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@@ -52,11 +53,13 @@ func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupR
} }
type CountValidator struct { type CountValidator struct {
name string name string
client resourcepb.ResourceIndexClient client resourcepb.ResourceIndexClient
resource schema.GroupResource resource schema.GroupResource
table string table string
whereClause string whereClause string
driverName string
legacyEngine *xorm.Engine
} }
func NewCountValidator( func NewCountValidator(
@@ -64,13 +67,17 @@ func NewCountValidator(
resource schema.GroupResource, resource schema.GroupResource,
table string, table string,
whereClause string, whereClause string,
driverName string,
legacy *xorm.Engine,
) Validator { ) Validator {
return &CountValidator{ return &CountValidator{
name: "CountValidator", name: "CountValidator",
client: client, client: client,
resource: resource, resource: resource,
table: table, table: table,
whereClause: whereClause, whereClause: whereClause,
driverName: driverName,
legacyEngine: legacy,
} }
} }
@@ -115,27 +122,37 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err) return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
} }
legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count() legacyCount, err := v.legacyEngine.Table(v.table).Where(v.whereClause, orgID).Count()
if err != nil { if err != nil {
return fmt.Errorf("failed to count %s: %w", v.table, err) return fmt.Errorf("failed to count %s: %w", v.table, err)
} }
// Get unified storage count using GetStats API
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
Namespace: summary.Namespace,
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
})
if err != nil {
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
// Find the count for this specific resource type
var unifiedCount int64 var unifiedCount int64
for _, stat := range statsResp.Stats { if v.driverName == migrator.SQLite {
if stat.Group == summary.Group && stat.Resource == summary.Resource { unifiedCount, err = sess.Table("resource").
unifiedCount = stat.Count Where("namespace = ? AND `group` = ? AND resource = ?",
break summary.Namespace, summary.Group, summary.Resource).
Count()
if err != nil {
return fmt.Errorf("failed to count resource table for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
} else {
// Get unified storage count using GetStats API
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
Namespace: summary.Namespace,
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
})
if err != nil {
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
summary.Group, summary.Resource, summary.Namespace, err)
}
// Find the count for this specific resource type
for _, stat := range statsResp.Stats {
if stat.Group == summary.Group && stat.Resource == summary.Resource {
unifiedCount = stat.Count
break
}
} }
} }
@@ -162,19 +179,25 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
} }
type FolderTreeValidator struct { type FolderTreeValidator struct {
name string name string
client resourcepb.ResourceIndexClient client resourcepb.ResourceIndexClient
resource schema.GroupResource resource schema.GroupResource
driverName string
legacyEngine *xorm.Engine
} }
func NewFolderTreeValidator( func NewFolderTreeValidator(
client resourcepb.ResourceIndexClient, client resourcepb.ResourceIndexClient,
resource schema.GroupResource, resource schema.GroupResource,
driverName string,
legacyEngine *xorm.Engine,
) Validator { ) Validator {
return &FolderTreeValidator{ return &FolderTreeValidator{
name: "FolderTreeValidator", name: "FolderTreeValidator",
client: client, client: client,
resource: resource, resource: resource,
driverName: driverName,
legacyEngine: legacyEngine,
} }
} }
@@ -185,6 +208,12 @@ type legacyFolder struct {
Title string `xorm:"title"` Title string `xorm:"title"`
} }
type unifiedFolder struct {
GUID string `xorm:"guid"`
Name string `xorm:"name"`
Folder string `xorm:"folder"`
}
func (v *FolderTreeValidator) Name() string { func (v *FolderTreeValidator) Name() string {
return v.name return v.name
} }
@@ -212,13 +241,18 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
} }
// Build legacy folder parent map // Build legacy folder parent map
legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log) legacyParentMap, err := v.buildLegacyFolderParentMap(orgID, log)
if err != nil { if err != nil {
return fmt.Errorf("failed to build legacy folder parent map: %w", err) return fmt.Errorf("failed to build legacy folder parent map: %w", err)
} }
// Build unified storage folder parent map // Build unified storage folder parent map
unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log) var unifiedParentMap map[string]string
if v.driverName == migrator.SQLite {
unifiedParentMap, err = v.buildUnifiedFolderParentMapSQLite(sess, summary.Namespace, log)
} else {
unifiedParentMap, err = v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
}
if err != nil { if err != nil {
return fmt.Errorf("failed to build unified folder parent map: %w", err) return fmt.Errorf("failed to build unified folder parent map: %w", err)
} }
@@ -270,10 +304,10 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
return nil return nil
} }
func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) { func (v *FolderTreeValidator) buildLegacyFolderParentMap(orgID int64, log log.Logger) (map[string]string, error) {
// Query all folders for this org // Query all folders for this org
var folders []legacyFolder var folders []legacyFolder
err := sess.Table("dashboard"). err := v.legacyEngine.Table("dashboard").
Cols("id", "uid", "folder_uid", "title"). Cols("id", "uid", "folder_uid", "title").
Where("org_id = ? AND is_folder = ?", orgID, true). Where("org_id = ? AND is_folder = ?", orgID, true).
Find(&folders) Find(&folders)
@@ -348,3 +382,30 @@ func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, n
return parentMap, nil return parentMap, nil
} }
func (v *FolderTreeValidator) buildUnifiedFolderParentMapSQLite(sess *xorm.Session, namespace string, log log.Logger) (map[string]string, error) {
var folders []unifiedFolder
err := sess.Table("resource").
Cols("guid", "name", "folder").
Where("namespace = ? AND resource = ?", namespace, "folder").
Find(&folders)
if err != nil {
return nil, fmt.Errorf("failed to query unified folders: %w", err)
}
parentMap := make(map[string]string)
for _, folder := range folders {
parentMap[folder.Name] = folder.Folder
}
if len(parentMap) == 0 {
log.Debug("No unified folders found for namespace", "namespace", namespace)
return make(map[string]string), nil
}
log.Debug("Built unified folder parent map",
"folder_count", len(parentMap),
"namespace", namespace)
return parentMap, nil
}

View File

@@ -0,0 +1,25 @@
package resource
import (
"context"
"database/sql"
)
type transactionContextKey struct{}
// ContextWithTransaction returns a new context with the transaction stored directly.
// This is used for SQLite migrations where the transaction needs to be shared
// between the migration code and unified storage operations within the same process.
func ContextWithTransaction(ctx context.Context, tx *sql.Tx) context.Context {
return context.WithValue(ctx, transactionContextKey{}, tx)
}
// TransactionFromContext retrieves the transaction from context
func TransactionFromContext(ctx context.Context) *sql.Tx {
if v := ctx.Value(transactionContextKey{}); v != nil {
if tx, ok := v.(*sql.Tx); ok {
return tx
}
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/google/uuid" "github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -20,6 +21,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
) )
@@ -111,6 +113,19 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
} }
defer b.bulkLock.Finish(setting.Collection) defer b.bulkLock.Finish(setting.Collection)
// If provided, reuse the inproc transaction for SQLite
if clientCtx := inprocgrpc.ClientContext(ctx); clientCtx != nil && b.dialect.DialectName() == "sqlite" {
if externalTx := resource.TransactionFromContext(clientCtx); externalTx != nil {
b.log.Info("Using SQLite transaction from client context")
rsp := &resourcepb.BulkResponse{}
err := b.processBulkWithTx(ctx, dbimpl.NewTx(externalTx), setting, iter, rsp)
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp
}
}
// We may want to first write parquet, then read parquet // We may want to first write parquet, then read parquet
if b.dialect.DialectName() == "sqlite" { if b.dialect.DialectName() == "sqlite" {
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet") file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
@@ -151,109 +166,134 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse { func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
rsp := &resourcepb.BulkResponse{} rsp := &resourcepb.BulkResponse{}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rollbackWithError := func(err error) error { return b.processBulkWithTx(ctx, tx, setting, iter, rsp)
txerr := tx.Rollback() })
if txerr != nil { if err != nil {
b.log.Warn("rollback", "error", txerr) rsp.Error = resource.AsErrorResult(err)
} else { }
b.log.Info("rollback") return rsp
}
// processBulkWithTx performs the bulk operation using the provided transaction.
// This is used both when creating our own transaction and when reusing an external one.
func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resource.BulkSettings, iter resource.BulkRequestIterator, rsp *resourcepb.BulkResponse) error {
rollbackWithError := func(err error) error {
txerr := tx.Rollback()
if txerr != nil {
b.log.Warn("rollback", "error", txerr)
} else {
b.log.Info("rollback")
}
return err
}
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
for _, key := range setting.Collection {
summary, err := bulk.deleteCollection(key)
if err != nil {
return rollbackWithError(err)
} }
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err return err
} }
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps if b.dialect.DialectName() == "sqlite" {
rv := newBulkRV() nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource)
if err != nil {
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection)) b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key))
} else {
// First clear everything in the transaction b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key))
if setting.RebuildCollection { // Save the incremented RV
for _, key := range setting.Collection { if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil {
summary, err := bulk.deleteCollection(key) b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key))
if err != nil { } else {
return rollbackWithError(err) b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key))
} }
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
} }
} else { } else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err
}
// Make sure the collection RV is above our last written event // Make sure the collection RV is above our last written event
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) { _, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
return "", nil return "", nil
@@ -261,19 +301,15 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
if err != nil { if err != nil {
b.log.Warn("error increasing RV", "error", err) b.log.Warn("error increasing RV", "error", err)
} }
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
} }
return nil
}) // Update the last import time. This is important to trigger reindexing
if err != nil { // of the resource for a given namespace.
rsp.Error = resource.AsErrorResult(err) if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
} }
return rsp return nil
} }
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error { func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {

View File

@@ -48,6 +48,11 @@ type sqlTx struct {
*sql.Tx *sql.Tx
} }
// NewTx wraps an existing *sql.Tx with sqlTx
func NewTx(tx *sql.Tx) db.Tx {
return sqlTx{tx}
}
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) { func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source // // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
// and the parameters are passed as arguments." // and the parameters are passed as arguments."

View File

@@ -103,6 +103,11 @@ func (p *resourceDBProvider) Init(ctx context.Context) (db.DB, error) {
return p.resourceDB, p.initErr return p.resourceDB, p.initErr
} }
// GetEngine returns the underlying xorm.Engine for data migrations.
func (p *resourceDBProvider) GetEngine() *xorm.Engine {
return p.engine
}
func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) { func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) {
p.log.Info("Initializing Resource DB", p.log.Info("Initializing Resource DB",
"db_type", "db_type",

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/grafana/grafana/pkg/util/xorm"
) )
//go:generate mockery --with-expecter --name DB //go:generate mockery --with-expecter --name DB
@@ -19,6 +21,13 @@ type DBProvider interface {
Init(context.Context) (DB, error) Init(context.Context) (DB, error)
} }
// EngineProvider provides access to the underlying xorm.Engine.
type EngineProvider interface {
// GetEngine returns the underlying xorm.Engine.
// It can be used to run data migrations to unified storage.
GetEngine() *xorm.Engine
}
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit // DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
// testing. We purposefully hide database operation methods that would use // testing. We purposefully hide database operation methods that would use
// context.Background(). // context.Background().

View File

@@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
) )
@@ -32,6 +33,7 @@ type QOSEnqueueDequeuer interface {
type ServerOptions struct { type ServerOptions struct {
Backend resource.StorageBackend Backend resource.StorageBackend
DB infraDB.DB DB infraDB.DB
DBProvider db.DBProvider // If provided, use this instead of creating a new one
Cfg *setting.Cfg Cfg *setting.Cfg
Tracer trace.Tracer Tracer trace.Tracer
Reg prometheus.Registerer Reg prometheus.Registerer
@@ -91,9 +93,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Backend = opts.Backend serverOptions.Backend = opts.Backend
// TODO: we should probably have a proper interface for diagnostics/lifecycle // TODO: we should probably have a proper interface for diagnostics/lifecycle
} else { } else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer) // Use provided DBProvider if available, otherwise create a new one
if err != nil { eDB := opts.DBProvider
return nil, err if eDB == nil {
var err error
eDB, err = dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
}
} }
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"), isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),

View File

@@ -7,6 +7,7 @@ package xorm
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"reflect" "reflect"
@@ -43,7 +44,7 @@ type Session struct {
afterProcessors []executedProcessor afterProcessors []executedProcessor
prepareStmt bool prepareStmt bool
stmtCache map[uint32]*core.Stmt //key: hash.Hash32 of (queryStr, len(queryStr)) stmtCache map[uint32]*core.Stmt // key: hash.Hash32 of (queryStr, len(queryStr))
// !evalphobia! stored the last executed query on this session // !evalphobia! stored the last executed query on this session
lastSQL string lastSQL string
@@ -236,6 +237,14 @@ func (session *Session) DB() *core.DB {
return session.db return session.db
} }
// Tx returns the underlying transaction
func (session *Session) Tx() (*core.Tx, error) {
if session.tx == nil {
return nil, errors.New("no open transaction")
}
return session.tx, nil
}
func cleanupProcessorsClosures(slices *[]func(any)) { func cleanupProcessorsClosures(slices *[]func(any)) {
if len(*slices) > 0 { if len(*slices) > 0 {
*slices = make([]func(any), 0) *slices = make([]func(any), 0)