mirror of
https://github.com/grafana/grafana.git
synced 2025-12-20 19:44:55 +08:00
Compare commits
7 Commits
andrew/ela
...
fix-sqlite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
848041d894 | ||
|
|
c007438e90 | ||
|
|
97804e3a66 | ||
|
|
58265aad2e | ||
|
|
b16c102c46 | ||
|
|
79f812ddb5 | ||
|
|
7063df0336 |
@@ -15,7 +15,6 @@ import (
|
||||
_ "github.com/blugelabs/bluge"
|
||||
_ "github.com/blugelabs/bluge_segment_api"
|
||||
_ "github.com/crewjam/saml"
|
||||
_ "github.com/docker/go-connections/nat"
|
||||
_ "github.com/go-jose/go-jose/v4"
|
||||
_ "github.com/gobwas/glob"
|
||||
_ "github.com/googleapis/gax-go/v2"
|
||||
@@ -31,7 +30,6 @@ import (
|
||||
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
|
||||
_ "github.com/spyzhov/ajson"
|
||||
_ "github.com/stretchr/testify/require"
|
||||
_ "github.com/testcontainers/testcontainers-go"
|
||||
_ "gocloud.dev/secrets/awskms"
|
||||
_ "gocloud.dev/secrets/azurekeyvault"
|
||||
_ "gocloud.dev/secrets/gcpkms"
|
||||
@@ -56,7 +54,9 @@ import (
|
||||
_ "github.com/grafana/e2e"
|
||||
_ "github.com/grafana/gofpdf"
|
||||
_ "github.com/grafana/gomemcache/memcache"
|
||||
_ "github.com/grafana/tempo/pkg/traceql"
|
||||
|
||||
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
|
||||
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
|
||||
_ "github.com/grafana/tempo/pkg/traceql"
|
||||
_ "github.com/testcontainers/testcontainers-go"
|
||||
)
|
||||
|
||||
@@ -146,6 +146,7 @@ var wireExtsBasicSet = wire.NewSet(
|
||||
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
||||
wire.Struct(new(unified.Options), "*"),
|
||||
unified.ProvideUnifiedStorageClient,
|
||||
wire.Bind(new(resource.ResourceClient), new(*unified.ResourceClient)),
|
||||
sql.ProvideStorageBackend,
|
||||
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
||||
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
||||
|
||||
@@ -33,7 +33,10 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
@@ -47,6 +50,21 @@ type Options struct {
|
||||
SecureValues secrets.InlineSecureValueSupport
|
||||
}
|
||||
|
||||
// ResourceClient wraps a ResourceClient
|
||||
type ResourceClient struct {
|
||||
resource.ResourceClient
|
||||
engineProvider db.EngineProvider
|
||||
}
|
||||
|
||||
// GetEngine returns the underlying xorm.Engine of the underlying resource server
|
||||
// Returns nil for gRPC or file-based storage modes.
|
||||
func (c *ResourceClient) GetEngine() *xorm.Engine {
|
||||
if c.engineProvider != nil {
|
||||
return c.engineProvider.GetEngine()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type clientMetrics struct {
|
||||
requestDuration *prometheus.HistogramVec
|
||||
requestRetries *prometheus.CounterVec
|
||||
@@ -56,9 +74,9 @@ type clientMetrics struct {
|
||||
func ProvideUnifiedStorageClient(opts *Options,
|
||||
storageMetrics *resource.StorageMetrics,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
) (resource.ResourceClient, error) {
|
||||
) (*ResourceClient, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
client, err := newClient(options.StorageOptions{
|
||||
client, engineProvider, err := newClient(options.StorageOptions{
|
||||
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
|
||||
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
|
||||
Address: apiserverCfg.Key("address").MustString(""),
|
||||
@@ -67,34 +85,39 @@ func ProvideUnifiedStorageClient(opts *Options,
|
||||
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
||||
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
||||
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
|
||||
if err == nil {
|
||||
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
||||
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
||||
var disableDashboardsFallback, disableFoldersFallback bool
|
||||
if opts.Cfg != nil {
|
||||
// String are static here, so we don't need to import the packages.
|
||||
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
|
||||
disableFoldersFallback = foldersMode == grafanarest.Mode5
|
||||
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
|
||||
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
|
||||
}
|
||||
|
||||
// Used to get the folder stats
|
||||
client = federated.NewFederatedClient(
|
||||
client, // The original
|
||||
legacysql.NewDatabaseProvider(opts.DB),
|
||||
disableDashboardsFallback,
|
||||
disableFoldersFallback,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client, err
|
||||
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
||||
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
||||
var disableDashboardsFallback, disableFoldersFallback bool
|
||||
if opts.Cfg != nil {
|
||||
// String are static here, so we don't need to import the packages.
|
||||
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
|
||||
disableFoldersFallback = foldersMode == grafanarest.Mode5
|
||||
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
|
||||
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
|
||||
}
|
||||
|
||||
// Used to get the folder stats
|
||||
federatedClient := federated.NewFederatedClient(
|
||||
client,
|
||||
legacysql.NewDatabaseProvider(opts.DB),
|
||||
disableDashboardsFallback,
|
||||
disableFoldersFallback,
|
||||
)
|
||||
|
||||
return &ResourceClient{
|
||||
ResourceClient: federatedClient,
|
||||
engineProvider: engineProvider,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newClient(opts options.StorageOptions,
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
db infraDB.DB,
|
||||
infradb infraDB.DB,
|
||||
tracer tracing.Tracer,
|
||||
reg prometheus.Registerer,
|
||||
authzc types.AccessClient,
|
||||
@@ -102,7 +125,7 @@ func newClient(opts options.StorageOptions,
|
||||
storageMetrics *resource.StorageMetrics,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
secure secrets.InlineSecureValueSupport,
|
||||
) (resource.ResourceClient, error) {
|
||||
) (resource.ResourceClient, db.EngineProvider, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
switch opts.StorageType {
|
||||
@@ -112,18 +135,18 @@ func newClient(opts options.StorageOptions,
|
||||
}
|
||||
|
||||
// Create BadgerDB instance
|
||||
db, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
|
||||
badgerDB, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
|
||||
WithLogger(nil))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
kv := resource.NewBadgerKV(db)
|
||||
kv := resource.NewBadgerKV(badgerDB)
|
||||
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
|
||||
KvStore: kv,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
|
||||
@@ -133,13 +156,13 @@ func newClient(opts options.StorageOptions,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server), nil, nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
|
||||
return nil, nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -151,30 +174,44 @@ func newClient(opts options.StorageOptions,
|
||||
|
||||
conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if opts.SearchServerAddress != "" {
|
||||
indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
indexConn = conn
|
||||
}
|
||||
|
||||
// Create a resource client
|
||||
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
||||
client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
||||
return client, nil, err
|
||||
|
||||
default:
|
||||
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Create DBProvider for SQL storage - this will be shared with migration service
|
||||
dbProvider, err := dbimpl.ProvideResourceDB(infradb, cfg, tracer)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// The concrete type implements both DBProvider and EngineProvider
|
||||
engineProvider, ok := dbProvider.(db.EngineProvider)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("DBProvider does not implement EngineProvider")
|
||||
}
|
||||
|
||||
serverOptions := sql.ServerOptions{
|
||||
DB: db,
|
||||
DB: infradb,
|
||||
DBProvider: dbProvider,
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
Reg: reg,
|
||||
@@ -194,28 +231,28 @@ func newClient(opts options.StorageOptions,
|
||||
Logger: cfg.Logger,
|
||||
})
|
||||
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
|
||||
return nil, fmt.Errorf("failed to start queue: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to start queue: %w", err)
|
||||
}
|
||||
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||
NumWorkers: cfg.QOSNumberWorker,
|
||||
Logger: cfg.Logger,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create scheduler: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create scheduler: %w", err)
|
||||
}
|
||||
|
||||
err = services.StartAndAwaitRunning(ctx, scheduler)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start scheduler: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to start scheduler: %w", err)
|
||||
}
|
||||
serverOptions.QOSQueue = queue
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server), engineProvider, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
||||
resourceServer.resetCalls()
|
||||
indexServer.resetCalls()
|
||||
|
||||
client, err := newClient(
|
||||
client, _, err := newClient(
|
||||
options.StorageOptions{
|
||||
StorageType: options.StorageTypeUnifiedGrpc,
|
||||
Address: resourceServerAddress,
|
||||
@@ -64,7 +64,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
||||
resourceServer.resetCalls()
|
||||
indexServer.resetCalls()
|
||||
|
||||
client, err := newClient(
|
||||
client, _, err := newClient(
|
||||
options.StorageOptions{
|
||||
StorageType: options.StorageTypeUnifiedGrpc,
|
||||
Address: resourceServerAddress,
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -25,11 +26,12 @@ type Validator interface {
|
||||
// ResourceMigration handles migration of specific resource types from legacy to unified storage.
|
||||
type ResourceMigration struct {
|
||||
migrator.MigrationBase
|
||||
migrator UnifiedMigrator
|
||||
resources []schema.GroupResource
|
||||
migrationID string
|
||||
validators []Validator // Optional: custom validation logic for this migration
|
||||
log log.Logger
|
||||
migrator UnifiedMigrator
|
||||
resources []schema.GroupResource
|
||||
migrationID string
|
||||
validators []Validator // Optional: custom validation logic for this migration
|
||||
log log.Logger
|
||||
legacyEngine *xorm.Engine
|
||||
}
|
||||
|
||||
// NewResourceMigration creates a new migration for the specified resources.
|
||||
@@ -38,13 +40,15 @@ func NewResourceMigration(
|
||||
resources []schema.GroupResource,
|
||||
migrationID string,
|
||||
validators []Validator,
|
||||
legacyEngine *xorm.Engine,
|
||||
) *ResourceMigration {
|
||||
return &ResourceMigration{
|
||||
migrator: migrator,
|
||||
resources: resources,
|
||||
migrationID: migrationID,
|
||||
validators: validators,
|
||||
log: log.New("storage.unified.resource_migration." + migrationID),
|
||||
migrator: migrator,
|
||||
resources: resources,
|
||||
migrationID: migrationID,
|
||||
validators: validators,
|
||||
legacyEngine: legacyEngine,
|
||||
log: log.New("storage.unified.resource_migration." + migrationID),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +63,7 @@ func (m *ResourceMigration) SQL(_ migrator.Dialect) string {
|
||||
func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||
ctx := context.Background()
|
||||
|
||||
orgs, err := m.getAllOrgs(sess)
|
||||
orgs, err := m.getAllOrgs()
|
||||
if err != nil {
|
||||
m.log.Error("failed to get organizations", "error", err)
|
||||
return fmt.Errorf("failed to get organizations: %w", err)
|
||||
@@ -72,6 +76,17 @@ func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) erro
|
||||
|
||||
m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources)
|
||||
|
||||
if mg.Dialect.DriverName() == migrator.SQLite {
|
||||
// reuse transaction in SQLite to avoid "database is locked" errors
|
||||
tx, err := sess.Tx()
|
||||
if err != nil {
|
||||
m.log.Error("Failed to get transaction from session", "error", err)
|
||||
return fmt.Errorf("failed to get transaction: %w", err)
|
||||
}
|
||||
ctx = resource.ContextWithTransaction(ctx, tx.Tx)
|
||||
m.log.Info("Stored migrator transaction in context for bulk operations (SQLite compatibility)")
|
||||
}
|
||||
|
||||
for _, org := range orgs {
|
||||
if err := m.migrateOrg(ctx, sess, org); err != nil {
|
||||
return err
|
||||
@@ -107,6 +122,10 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session,
|
||||
m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime))
|
||||
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err)
|
||||
}
|
||||
if response.Error != nil {
|
||||
m.log.Error("Migration reported error", "org_id", org.ID, "error", response.Error.String(), "duration", time.Since(startTime))
|
||||
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, fmt.Errorf("migration error: %s", response.Error.Message))
|
||||
}
|
||||
|
||||
// Validate the migration results
|
||||
if err := m.validateMigration(migrationCtx, sess, response); err != nil {
|
||||
@@ -158,9 +177,9 @@ type orgInfo struct {
|
||||
}
|
||||
|
||||
// getAllOrgs retrieves all organizations from the database
|
||||
func (m *ResourceMigration) getAllOrgs(sess *xorm.Session) ([]orgInfo, error) {
|
||||
func (m *ResourceMigration) getAllOrgs() ([]orgInfo, error) {
|
||||
var orgs []orgInfo
|
||||
err := sess.Table("org").Cols("id", "name").Find(&orgs)
|
||||
err := m.legacyEngine.Table("org").Cols("id", "name").Find(&orgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -10,8 +10,10 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/migrations/contract"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -21,11 +23,12 @@ var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/migrati
|
||||
var logger = log.New("storage.unified.migrations")
|
||||
|
||||
type UnifiedStorageMigrationServiceImpl struct {
|
||||
migrator UnifiedMigrator
|
||||
cfg *setting.Cfg
|
||||
sqlStore db.DB
|
||||
kv kvstore.KVStore
|
||||
client resource.ResourceClient
|
||||
migrator UnifiedMigrator
|
||||
cfg *setting.Cfg
|
||||
sqlStore db.DB
|
||||
kv kvstore.KVStore
|
||||
client *unified.ResourceClient
|
||||
resourceDBEngine *xorm.Engine // For SQLite, use unified storage's engine for data migrations
|
||||
}
|
||||
|
||||
var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil)
|
||||
@@ -36,14 +39,23 @@ func ProvideUnifiedStorageMigrationService(
|
||||
cfg *setting.Cfg,
|
||||
sqlStore db.DB,
|
||||
kv kvstore.KVStore,
|
||||
client resource.ResourceClient,
|
||||
client *unified.ResourceClient,
|
||||
) contract.UnifiedStorageMigrationService {
|
||||
// Get engine from unified storage client, fallback to grafana core database engine
|
||||
resourceEngine := client.GetEngine()
|
||||
if resourceEngine != nil {
|
||||
logger.Info("Using Resource DB for unified storage migrations")
|
||||
} else {
|
||||
logger.Info("Using SQL Store DB for unified storage migrations")
|
||||
resourceEngine = sqlStore.GetEngine()
|
||||
}
|
||||
return &UnifiedStorageMigrationServiceImpl{
|
||||
migrator: migrator,
|
||||
cfg: cfg,
|
||||
sqlStore: sqlStore,
|
||||
kv: kv,
|
||||
client: client,
|
||||
migrator: migrator,
|
||||
cfg: cfg,
|
||||
sqlStore: sqlStore,
|
||||
kv: kv,
|
||||
client: client,
|
||||
resourceDBEngine: resourceEngine,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +73,7 @@ func (p *UnifiedStorageMigrationServiceImpl) Run(ctx context.Context) error {
|
||||
|
||||
// TODO: Re-enable once migrations are ready
|
||||
// TODO: add guarantee that this only runs once
|
||||
// return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client)
|
||||
//return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client, p.resourceDBEngine)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -70,10 +82,11 @@ func RegisterMigrations(
|
||||
cfg *setting.Cfg,
|
||||
sqlStore db.DB,
|
||||
client resource.ResourceClient,
|
||||
resourceDBEngine *xorm.Engine,
|
||||
) error {
|
||||
ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations")
|
||||
defer span.End()
|
||||
mg := sqlstoremigrator.NewScopedMigrator(sqlStore.GetEngine(), cfg, "unifiedstorage")
|
||||
mg := sqlstoremigrator.NewScopedMigrator(resourceDBEngine, cfg, "unifiedstorage")
|
||||
mg.AddCreateMigration()
|
||||
|
||||
if err := prometheus.Register(mg); err != nil {
|
||||
@@ -81,12 +94,17 @@ func RegisterMigrations(
|
||||
}
|
||||
|
||||
// Register resource migrations
|
||||
registerDashboardAndFolderMigration(mg, migrator, client)
|
||||
registerDashboardAndFolderMigration(mg, sqlStore.GetEngine(), migrator, client)
|
||||
|
||||
// Run all registered migrations (blocking)
|
||||
sec := cfg.Raw.Section("database")
|
||||
migrationLocking := sec.Key("migration_locking").MustBool(true)
|
||||
if mg.Dialect.DriverName() == sqlstoremigrator.SQLite {
|
||||
// disable migration locking for SQLite to avoid "database is locked" errors in the bulk operations
|
||||
migrationLocking = false
|
||||
}
|
||||
if err := mg.RunMigrations(ctx,
|
||||
sec.Key("migration_locking").MustBool(true),
|
||||
migrationLocking,
|
||||
sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil {
|
||||
return fmt.Errorf("unified storage data migration failed: %w", err)
|
||||
}
|
||||
@@ -95,15 +113,18 @@ func RegisterMigrations(
|
||||
return nil
|
||||
}
|
||||
|
||||
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) {
|
||||
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, legacyEngine *xorm.Engine, migrator UnifiedMigrator, client resource.ResourceClient) {
|
||||
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
|
||||
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
|
||||
driverName := mg.Dialect.DriverName()
|
||||
|
||||
folderCountValidator := NewCountValidator(
|
||||
client,
|
||||
folders,
|
||||
"dashboard",
|
||||
"org_id = ? and is_folder = true",
|
||||
driverName,
|
||||
legacyEngine,
|
||||
)
|
||||
|
||||
dashboardCountValidator := NewCountValidator(
|
||||
@@ -111,15 +132,18 @@ func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator
|
||||
dashboards,
|
||||
"dashboard",
|
||||
"org_id = ? and is_folder = false",
|
||||
driverName,
|
||||
legacyEngine,
|
||||
)
|
||||
|
||||
folderTreeValidator := NewFolderTreeValidator(client, folders)
|
||||
folderTreeValidator := NewFolderTreeValidator(client, folders, driverName, legacyEngine)
|
||||
|
||||
dashboardsAndFolders := NewResourceMigration(
|
||||
migrator,
|
||||
[]schema.GroupResource{folders, dashboards},
|
||||
"folders-dashboards",
|
||||
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
|
||||
legacyEngine,
|
||||
)
|
||||
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -52,11 +53,13 @@ func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupR
|
||||
}
|
||||
|
||||
type CountValidator struct {
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
table string
|
||||
whereClause string
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
table string
|
||||
whereClause string
|
||||
driverName string
|
||||
legacyEngine *xorm.Engine
|
||||
}
|
||||
|
||||
func NewCountValidator(
|
||||
@@ -64,13 +67,17 @@ func NewCountValidator(
|
||||
resource schema.GroupResource,
|
||||
table string,
|
||||
whereClause string,
|
||||
driverName string,
|
||||
legacy *xorm.Engine,
|
||||
) Validator {
|
||||
return &CountValidator{
|
||||
name: "CountValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
table: table,
|
||||
whereClause: whereClause,
|
||||
name: "CountValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
table: table,
|
||||
whereClause: whereClause,
|
||||
driverName: driverName,
|
||||
legacyEngine: legacy,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,27 +122,37 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
|
||||
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
||||
}
|
||||
|
||||
legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count()
|
||||
legacyCount, err := v.legacyEngine.Table(v.table).Where(v.whereClause, orgID).Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count %s: %w", v.table, err)
|
||||
}
|
||||
|
||||
// Get unified storage count using GetStats API
|
||||
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
||||
Namespace: summary.Namespace,
|
||||
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
||||
summary.Group, summary.Resource, summary.Namespace, err)
|
||||
}
|
||||
|
||||
// Find the count for this specific resource type
|
||||
var unifiedCount int64
|
||||
for _, stat := range statsResp.Stats {
|
||||
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
||||
unifiedCount = stat.Count
|
||||
break
|
||||
if v.driverName == migrator.SQLite {
|
||||
unifiedCount, err = sess.Table("resource").
|
||||
Where("namespace = ? AND `group` = ? AND resource = ?",
|
||||
summary.Namespace, summary.Group, summary.Resource).
|
||||
Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to count resource table for %s/%s in namespace %s: %w",
|
||||
summary.Group, summary.Resource, summary.Namespace, err)
|
||||
}
|
||||
} else {
|
||||
// Get unified storage count using GetStats API
|
||||
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
||||
Namespace: summary.Namespace,
|
||||
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
||||
summary.Group, summary.Resource, summary.Namespace, err)
|
||||
}
|
||||
// Find the count for this specific resource type
|
||||
for _, stat := range statsResp.Stats {
|
||||
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
||||
unifiedCount = stat.Count
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,19 +179,25 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
|
||||
}
|
||||
|
||||
type FolderTreeValidator struct {
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
name string
|
||||
client resourcepb.ResourceIndexClient
|
||||
resource schema.GroupResource
|
||||
driverName string
|
||||
legacyEngine *xorm.Engine
|
||||
}
|
||||
|
||||
func NewFolderTreeValidator(
|
||||
client resourcepb.ResourceIndexClient,
|
||||
resource schema.GroupResource,
|
||||
driverName string,
|
||||
legacyEngine *xorm.Engine,
|
||||
) Validator {
|
||||
return &FolderTreeValidator{
|
||||
name: "FolderTreeValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
name: "FolderTreeValidator",
|
||||
client: client,
|
||||
resource: resource,
|
||||
driverName: driverName,
|
||||
legacyEngine: legacyEngine,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,6 +208,12 @@ type legacyFolder struct {
|
||||
Title string `xorm:"title"`
|
||||
}
|
||||
|
||||
type unifiedFolder struct {
|
||||
GUID string `xorm:"guid"`
|
||||
Name string `xorm:"name"`
|
||||
Folder string `xorm:"folder"`
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) Name() string {
|
||||
return v.name
|
||||
}
|
||||
@@ -212,13 +241,18 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
|
||||
}
|
||||
|
||||
// Build legacy folder parent map
|
||||
legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log)
|
||||
legacyParentMap, err := v.buildLegacyFolderParentMap(orgID, log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build legacy folder parent map: %w", err)
|
||||
}
|
||||
|
||||
// Build unified storage folder parent map
|
||||
unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
|
||||
var unifiedParentMap map[string]string
|
||||
if v.driverName == migrator.SQLite {
|
||||
unifiedParentMap, err = v.buildUnifiedFolderParentMapSQLite(sess, summary.Namespace, log)
|
||||
} else {
|
||||
unifiedParentMap, err = v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build unified folder parent map: %w", err)
|
||||
}
|
||||
@@ -270,10 +304,10 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) {
|
||||
func (v *FolderTreeValidator) buildLegacyFolderParentMap(orgID int64, log log.Logger) (map[string]string, error) {
|
||||
// Query all folders for this org
|
||||
var folders []legacyFolder
|
||||
err := sess.Table("dashboard").
|
||||
err := v.legacyEngine.Table("dashboard").
|
||||
Cols("id", "uid", "folder_uid", "title").
|
||||
Where("org_id = ? AND is_folder = ?", orgID, true).
|
||||
Find(&folders)
|
||||
@@ -348,3 +382,30 @@ func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, n
|
||||
|
||||
return parentMap, nil
|
||||
}
|
||||
|
||||
func (v *FolderTreeValidator) buildUnifiedFolderParentMapSQLite(sess *xorm.Session, namespace string, log log.Logger) (map[string]string, error) {
|
||||
var folders []unifiedFolder
|
||||
err := sess.Table("resource").
|
||||
Cols("guid", "name", "folder").
|
||||
Where("namespace = ? AND resource = ?", namespace, "folder").
|
||||
Find(&folders)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query unified folders: %w", err)
|
||||
}
|
||||
|
||||
parentMap := make(map[string]string)
|
||||
for _, folder := range folders {
|
||||
parentMap[folder.Name] = folder.Folder
|
||||
}
|
||||
|
||||
if len(parentMap) == 0 {
|
||||
log.Debug("No unified folders found for namespace", "namespace", namespace)
|
||||
return make(map[string]string), nil
|
||||
}
|
||||
|
||||
log.Debug("Built unified folder parent map",
|
||||
"folder_count", len(parentMap),
|
||||
"namespace", namespace)
|
||||
|
||||
return parentMap, nil
|
||||
}
|
||||
|
||||
25
pkg/storage/unified/resource/transaction.go
Normal file
25
pkg/storage/unified/resource/transaction.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type transactionContextKey struct{}
|
||||
|
||||
// ContextWithTransaction returns a new context with the transaction stored directly.
|
||||
// This is used for SQLite migrations where the transaction needs to be shared
|
||||
// between the migration code and unified storage operations within the same process.
|
||||
func ContextWithTransaction(ctx context.Context, tx *sql.Tx) context.Context {
|
||||
return context.WithValue(ctx, transactionContextKey{}, tx)
|
||||
}
|
||||
|
||||
// TransactionFromContext retrieves the transaction from context
|
||||
func TransactionFromContext(ctx context.Context) *sql.Tx {
|
||||
if v := ctx.Value(transactionContextKey{}); v != nil {
|
||||
if tx, ok := v.(*sql.Tx); ok {
|
||||
return tx
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
"github.com/google/uuid"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
|
||||
)
|
||||
@@ -111,6 +113,19 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
|
||||
}
|
||||
defer b.bulkLock.Finish(setting.Collection)
|
||||
|
||||
// If provided, reuse the inproc transaction for SQLite
|
||||
if clientCtx := inprocgrpc.ClientContext(ctx); clientCtx != nil && b.dialect.DialectName() == "sqlite" {
|
||||
if externalTx := resource.TransactionFromContext(clientCtx); externalTx != nil {
|
||||
b.log.Info("Using SQLite transaction from client context")
|
||||
rsp := &resourcepb.BulkResponse{}
|
||||
err := b.processBulkWithTx(ctx, dbimpl.NewTx(externalTx), setting, iter, rsp)
|
||||
if err != nil {
|
||||
rsp.Error = resource.AsErrorResult(err)
|
||||
}
|
||||
return rsp
|
||||
}
|
||||
}
|
||||
|
||||
// We may want to first write parquet, then read parquet
|
||||
if b.dialect.DialectName() == "sqlite" {
|
||||
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
|
||||
@@ -151,109 +166,134 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
|
||||
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
|
||||
rsp := &resourcepb.BulkResponse{}
|
||||
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
|
||||
rollbackWithError := func(err error) error {
|
||||
txerr := tx.Rollback()
|
||||
if txerr != nil {
|
||||
b.log.Warn("rollback", "error", txerr)
|
||||
} else {
|
||||
b.log.Info("rollback")
|
||||
return b.processBulkWithTx(ctx, tx, setting, iter, rsp)
|
||||
})
|
||||
if err != nil {
|
||||
rsp.Error = resource.AsErrorResult(err)
|
||||
}
|
||||
return rsp
|
||||
}
|
||||
|
||||
// processBulkWithTx performs the bulk operation using the provided transaction.
|
||||
// This is used both when creating our own transaction and when reusing an external one.
|
||||
func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resource.BulkSettings, iter resource.BulkRequestIterator, rsp *resourcepb.BulkResponse) error {
|
||||
rollbackWithError := func(err error) error {
|
||||
txerr := tx.Rollback()
|
||||
if txerr != nil {
|
||||
b.log.Warn("rollback", "error", txerr)
|
||||
} else {
|
||||
b.log.Info("rollback")
|
||||
}
|
||||
return err
|
||||
}
|
||||
bulk := &bulkWroker{
|
||||
ctx: ctx,
|
||||
tx: tx,
|
||||
dialect: b.dialect,
|
||||
logger: logging.FromContext(ctx),
|
||||
}
|
||||
|
||||
// Calculate the RV based on incoming request timestamps
|
||||
rv := newBulkRV()
|
||||
|
||||
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
|
||||
|
||||
// First clear everything in the transaction
|
||||
if setting.RebuildCollection {
|
||||
for _, key := range setting.Collection {
|
||||
summary, err := bulk.deleteCollection(key)
|
||||
if err != nil {
|
||||
return rollbackWithError(err)
|
||||
}
|
||||
summaries[resource.NSGR(key)] = summary
|
||||
rsp.Summary = append(rsp.Summary, summary)
|
||||
}
|
||||
} else {
|
||||
for _, key := range setting.Collection {
|
||||
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
obj := &unstructured.Unstructured{}
|
||||
|
||||
// Write each event into the history
|
||||
for iter.Next() {
|
||||
if iter.RollbackRequested() {
|
||||
return rollbackWithError(nil)
|
||||
}
|
||||
req := iter.Request()
|
||||
if req == nil {
|
||||
return rollbackWithError(fmt.Errorf("missing request"))
|
||||
}
|
||||
rsp.Processed++
|
||||
|
||||
if req.Action == resourcepb.BulkRequest_UNKNOWN {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "unknown action",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
err := obj.UnmarshalJSON(req.Value)
|
||||
if err != nil {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "unable to unmarshal json",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Write the event to history
|
||||
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
|
||||
SQLTemplate: sqltemplate.New(b.dialect),
|
||||
WriteEvent: resource.WriteEvent{
|
||||
Key: req.Key,
|
||||
Type: resourcepb.WatchEvent_Type(req.Action),
|
||||
Value: req.Value,
|
||||
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
|
||||
},
|
||||
Folder: req.Folder,
|
||||
GUID: uuid.New().String(),
|
||||
ResourceVersion: rv.next(obj),
|
||||
}); err != nil {
|
||||
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Now update the resource table from history
|
||||
for _, key := range setting.Collection {
|
||||
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
|
||||
summary := summaries[k]
|
||||
if summary == nil {
|
||||
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
|
||||
}
|
||||
|
||||
err := bulk.syncCollection(key, summary)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bulk := &bulkWroker{
|
||||
ctx: ctx,
|
||||
tx: tx,
|
||||
dialect: b.dialect,
|
||||
logger: logging.FromContext(ctx),
|
||||
}
|
||||
|
||||
// Calculate the RV based on incoming request timestamps
|
||||
rv := newBulkRV()
|
||||
|
||||
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
|
||||
|
||||
// First clear everything in the transaction
|
||||
if setting.RebuildCollection {
|
||||
for _, key := range setting.Collection {
|
||||
summary, err := bulk.deleteCollection(key)
|
||||
if err != nil {
|
||||
return rollbackWithError(err)
|
||||
if b.dialect.DialectName() == "sqlite" {
|
||||
nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource)
|
||||
if err != nil {
|
||||
b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key))
|
||||
} else {
|
||||
b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key))
|
||||
// Save the incremented RV
|
||||
if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil {
|
||||
b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key))
|
||||
} else {
|
||||
b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key))
|
||||
}
|
||||
summaries[resource.NSGR(key)] = summary
|
||||
rsp.Summary = append(rsp.Summary, summary)
|
||||
}
|
||||
} else {
|
||||
for _, key := range setting.Collection {
|
||||
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
obj := &unstructured.Unstructured{}
|
||||
|
||||
// Write each event into the history
|
||||
for iter.Next() {
|
||||
if iter.RollbackRequested() {
|
||||
return rollbackWithError(nil)
|
||||
}
|
||||
req := iter.Request()
|
||||
if req == nil {
|
||||
return rollbackWithError(fmt.Errorf("missing request"))
|
||||
}
|
||||
rsp.Processed++
|
||||
|
||||
if req.Action == resourcepb.BulkRequest_UNKNOWN {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "unknown action",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
err := obj.UnmarshalJSON(req.Value)
|
||||
if err != nil {
|
||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||
Key: req.Key,
|
||||
Action: req.Action,
|
||||
Error: "unable to unmarshal json",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Write the event to history
|
||||
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
|
||||
SQLTemplate: sqltemplate.New(b.dialect),
|
||||
WriteEvent: resource.WriteEvent{
|
||||
Key: req.Key,
|
||||
Type: resourcepb.WatchEvent_Type(req.Action),
|
||||
Value: req.Value,
|
||||
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
|
||||
},
|
||||
Folder: req.Folder,
|
||||
GUID: uuid.New().String(),
|
||||
ResourceVersion: rv.next(obj),
|
||||
}); err != nil {
|
||||
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Now update the resource table from history
|
||||
for _, key := range setting.Collection {
|
||||
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
|
||||
summary := summaries[k]
|
||||
if summary == nil {
|
||||
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
|
||||
}
|
||||
|
||||
err := bulk.syncCollection(key, summary)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Make sure the collection RV is above our last written event
|
||||
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
|
||||
return "", nil
|
||||
@@ -261,19 +301,15 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
|
||||
if err != nil {
|
||||
b.log.Warn("error increasing RV", "error", err)
|
||||
}
|
||||
|
||||
// Update the last import time. This is important to trigger reindexing
|
||||
// of the resource for a given namespace.
|
||||
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
|
||||
return rollbackWithError(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
rsp.Error = resource.AsErrorResult(err)
|
||||
|
||||
// Update the last import time. This is important to trigger reindexing
|
||||
// of the resource for a given namespace.
|
||||
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
|
||||
return rollbackWithError(err)
|
||||
}
|
||||
}
|
||||
return rsp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {
|
||||
|
||||
@@ -48,6 +48,11 @@ type sqlTx struct {
|
||||
*sql.Tx
|
||||
}
|
||||
|
||||
// NewTx wraps an existing *sql.Tx with sqlTx
|
||||
func NewTx(tx *sql.Tx) db.Tx {
|
||||
return sqlTx{tx}
|
||||
}
|
||||
|
||||
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
|
||||
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
|
||||
// and the parameters are passed as arguments."
|
||||
|
||||
@@ -103,6 +103,11 @@ func (p *resourceDBProvider) Init(ctx context.Context) (db.DB, error) {
|
||||
return p.resourceDB, p.initErr
|
||||
}
|
||||
|
||||
// GetEngine returns the underlying xorm.Engine for data migrations.
|
||||
func (p *resourceDBProvider) GetEngine() *xorm.Engine {
|
||||
return p.engine
|
||||
}
|
||||
|
||||
func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) {
|
||||
p.log.Info("Initializing Resource DB",
|
||||
"db_type",
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
)
|
||||
|
||||
//go:generate mockery --with-expecter --name DB
|
||||
@@ -19,6 +21,13 @@ type DBProvider interface {
|
||||
Init(context.Context) (DB, error)
|
||||
}
|
||||
|
||||
// EngineProvider provides access to the underlying xorm.Engine.
|
||||
type EngineProvider interface {
|
||||
// GetEngine returns the underlying xorm.Engine.
|
||||
// It can be used to run data migrations to unified storage.
|
||||
GetEngine() *xorm.Engine
|
||||
}
|
||||
|
||||
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
|
||||
// testing. We purposefully hide database operation methods that would use
|
||||
// context.Background().
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
)
|
||||
|
||||
@@ -32,6 +33,7 @@ type QOSEnqueueDequeuer interface {
|
||||
type ServerOptions struct {
|
||||
Backend resource.StorageBackend
|
||||
DB infraDB.DB
|
||||
DBProvider db.DBProvider // If provided, use this instead of creating a new one
|
||||
Cfg *setting.Cfg
|
||||
Tracer trace.Tracer
|
||||
Reg prometheus.Registerer
|
||||
@@ -91,9 +93,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
serverOptions.Backend = opts.Backend
|
||||
// TODO: we should probably have a proper interface for diagnostics/lifecycle
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Use provided DBProvider if available, otherwise create a new one
|
||||
eDB := opts.DBProvider
|
||||
if eDB == nil {
|
||||
var err error
|
||||
eDB, err = dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||
|
||||
@@ -7,6 +7,7 @@ package xorm
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"reflect"
|
||||
@@ -43,7 +44,7 @@ type Session struct {
|
||||
afterProcessors []executedProcessor
|
||||
|
||||
prepareStmt bool
|
||||
stmtCache map[uint32]*core.Stmt //key: hash.Hash32 of (queryStr, len(queryStr))
|
||||
stmtCache map[uint32]*core.Stmt // key: hash.Hash32 of (queryStr, len(queryStr))
|
||||
|
||||
// !evalphobia! stored the last executed query on this session
|
||||
lastSQL string
|
||||
@@ -236,6 +237,14 @@ func (session *Session) DB() *core.DB {
|
||||
return session.db
|
||||
}
|
||||
|
||||
// Tx returns the underlying transaction
|
||||
func (session *Session) Tx() (*core.Tx, error) {
|
||||
if session.tx == nil {
|
||||
return nil, errors.New("no open transaction")
|
||||
}
|
||||
return session.tx, nil
|
||||
}
|
||||
|
||||
func cleanupProcessorsClosures(slices *[]func(any)) {
|
||||
if len(*slices) > 0 {
|
||||
*slices = make([]func(any), 0)
|
||||
|
||||
Reference in New Issue
Block a user