mirror of
https://github.com/grafana/grafana.git
synced 2025-12-22 04:34:27 +08:00
Compare commits
7 Commits
docs/grafa
...
fix-sqlite
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
848041d894 | ||
|
|
c007438e90 | ||
|
|
97804e3a66 | ||
|
|
58265aad2e | ||
|
|
b16c102c46 | ||
|
|
79f812ddb5 | ||
|
|
7063df0336 |
@@ -15,7 +15,6 @@ import (
|
|||||||
_ "github.com/blugelabs/bluge"
|
_ "github.com/blugelabs/bluge"
|
||||||
_ "github.com/blugelabs/bluge_segment_api"
|
_ "github.com/blugelabs/bluge_segment_api"
|
||||||
_ "github.com/crewjam/saml"
|
_ "github.com/crewjam/saml"
|
||||||
_ "github.com/docker/go-connections/nat"
|
|
||||||
_ "github.com/go-jose/go-jose/v4"
|
_ "github.com/go-jose/go-jose/v4"
|
||||||
_ "github.com/gobwas/glob"
|
_ "github.com/gobwas/glob"
|
||||||
_ "github.com/googleapis/gax-go/v2"
|
_ "github.com/googleapis/gax-go/v2"
|
||||||
@@ -31,7 +30,6 @@ import (
|
|||||||
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
|
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
|
||||||
_ "github.com/spyzhov/ajson"
|
_ "github.com/spyzhov/ajson"
|
||||||
_ "github.com/stretchr/testify/require"
|
_ "github.com/stretchr/testify/require"
|
||||||
_ "github.com/testcontainers/testcontainers-go"
|
|
||||||
_ "gocloud.dev/secrets/awskms"
|
_ "gocloud.dev/secrets/awskms"
|
||||||
_ "gocloud.dev/secrets/azurekeyvault"
|
_ "gocloud.dev/secrets/azurekeyvault"
|
||||||
_ "gocloud.dev/secrets/gcpkms"
|
_ "gocloud.dev/secrets/gcpkms"
|
||||||
@@ -56,7 +54,9 @@ import (
|
|||||||
_ "github.com/grafana/e2e"
|
_ "github.com/grafana/e2e"
|
||||||
_ "github.com/grafana/gofpdf"
|
_ "github.com/grafana/gofpdf"
|
||||||
_ "github.com/grafana/gomemcache/memcache"
|
_ "github.com/grafana/gomemcache/memcache"
|
||||||
|
_ "github.com/grafana/tempo/pkg/traceql"
|
||||||
|
|
||||||
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
|
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
|
||||||
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
|
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
|
||||||
_ "github.com/grafana/tempo/pkg/traceql"
|
_ "github.com/testcontainers/testcontainers-go"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -146,6 +146,7 @@ var wireExtsBasicSet = wire.NewSet(
|
|||||||
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
||||||
wire.Struct(new(unified.Options), "*"),
|
wire.Struct(new(unified.Options), "*"),
|
||||||
unified.ProvideUnifiedStorageClient,
|
unified.ProvideUnifiedStorageClient,
|
||||||
|
wire.Bind(new(resource.ResourceClient), new(*unified.ResourceClient)),
|
||||||
sql.ProvideStorageBackend,
|
sql.ProvideStorageBackend,
|
||||||
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
||||||
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
||||||
|
|||||||
@@ -33,7 +33,10 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||||
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
@@ -47,6 +50,21 @@ type Options struct {
|
|||||||
SecureValues secrets.InlineSecureValueSupport
|
SecureValues secrets.InlineSecureValueSupport
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResourceClient wraps a ResourceClient
|
||||||
|
type ResourceClient struct {
|
||||||
|
resource.ResourceClient
|
||||||
|
engineProvider db.EngineProvider
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEngine returns the underlying xorm.Engine of the underlying resource server
|
||||||
|
// Returns nil for gRPC or file-based storage modes.
|
||||||
|
func (c *ResourceClient) GetEngine() *xorm.Engine {
|
||||||
|
if c.engineProvider != nil {
|
||||||
|
return c.engineProvider.GetEngine()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type clientMetrics struct {
|
type clientMetrics struct {
|
||||||
requestDuration *prometheus.HistogramVec
|
requestDuration *prometheus.HistogramVec
|
||||||
requestRetries *prometheus.CounterVec
|
requestRetries *prometheus.CounterVec
|
||||||
@@ -56,9 +74,9 @@ type clientMetrics struct {
|
|||||||
func ProvideUnifiedStorageClient(opts *Options,
|
func ProvideUnifiedStorageClient(opts *Options,
|
||||||
storageMetrics *resource.StorageMetrics,
|
storageMetrics *resource.StorageMetrics,
|
||||||
indexMetrics *resource.BleveIndexMetrics,
|
indexMetrics *resource.BleveIndexMetrics,
|
||||||
) (resource.ResourceClient, error) {
|
) (*ResourceClient, error) {
|
||||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||||
client, err := newClient(options.StorageOptions{
|
client, engineProvider, err := newClient(options.StorageOptions{
|
||||||
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
|
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
|
||||||
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
|
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
|
||||||
Address: apiserverCfg.Key("address").MustString(""),
|
Address: apiserverCfg.Key("address").MustString(""),
|
||||||
@@ -67,34 +85,39 @@ func ProvideUnifiedStorageClient(opts *Options,
|
|||||||
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
||||||
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
||||||
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
|
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
return nil, err
|
||||||
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
|
||||||
var disableDashboardsFallback, disableFoldersFallback bool
|
|
||||||
if opts.Cfg != nil {
|
|
||||||
// String are static here, so we don't need to import the packages.
|
|
||||||
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
|
|
||||||
disableFoldersFallback = foldersMode == grafanarest.Mode5
|
|
||||||
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
|
|
||||||
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
|
|
||||||
}
|
|
||||||
|
|
||||||
// Used to get the folder stats
|
|
||||||
client = federated.NewFederatedClient(
|
|
||||||
client, // The original
|
|
||||||
legacysql.NewDatabaseProvider(opts.DB),
|
|
||||||
disableDashboardsFallback,
|
|
||||||
disableFoldersFallback,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return client, err
|
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
||||||
|
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
||||||
|
var disableDashboardsFallback, disableFoldersFallback bool
|
||||||
|
if opts.Cfg != nil {
|
||||||
|
// String are static here, so we don't need to import the packages.
|
||||||
|
foldersMode := opts.Cfg.UnifiedStorage["folders.folder.grafana.app"].DualWriterMode
|
||||||
|
disableFoldersFallback = foldersMode == grafanarest.Mode5
|
||||||
|
dashboardsMode := opts.Cfg.UnifiedStorage["dashboards.dashboard.grafana.app"].DualWriterMode
|
||||||
|
disableDashboardsFallback = dashboardsMode == grafanarest.Mode5
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used to get the folder stats
|
||||||
|
federatedClient := federated.NewFederatedClient(
|
||||||
|
client,
|
||||||
|
legacysql.NewDatabaseProvider(opts.DB),
|
||||||
|
disableDashboardsFallback,
|
||||||
|
disableFoldersFallback,
|
||||||
|
)
|
||||||
|
|
||||||
|
return &ResourceClient{
|
||||||
|
ResourceClient: federatedClient,
|
||||||
|
engineProvider: engineProvider,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(opts options.StorageOptions,
|
func newClient(opts options.StorageOptions,
|
||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
features featuremgmt.FeatureToggles,
|
features featuremgmt.FeatureToggles,
|
||||||
db infraDB.DB,
|
infradb infraDB.DB,
|
||||||
tracer tracing.Tracer,
|
tracer tracing.Tracer,
|
||||||
reg prometheus.Registerer,
|
reg prometheus.Registerer,
|
||||||
authzc types.AccessClient,
|
authzc types.AccessClient,
|
||||||
@@ -102,7 +125,7 @@ func newClient(opts options.StorageOptions,
|
|||||||
storageMetrics *resource.StorageMetrics,
|
storageMetrics *resource.StorageMetrics,
|
||||||
indexMetrics *resource.BleveIndexMetrics,
|
indexMetrics *resource.BleveIndexMetrics,
|
||||||
secure secrets.InlineSecureValueSupport,
|
secure secrets.InlineSecureValueSupport,
|
||||||
) (resource.ResourceClient, error) {
|
) (resource.ResourceClient, db.EngineProvider, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
switch opts.StorageType {
|
switch opts.StorageType {
|
||||||
@@ -112,18 +135,18 @@ func newClient(opts options.StorageOptions,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create BadgerDB instance
|
// Create BadgerDB instance
|
||||||
db, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
|
badgerDB, err := badger.Open(badger.DefaultOptions(filepath.Join(opts.DataPath, "badger")).
|
||||||
WithLogger(nil))
|
WithLogger(nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
kv := resource.NewBadgerKV(db)
|
kv := resource.NewBadgerKV(badgerDB)
|
||||||
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
|
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
|
||||||
KvStore: kv,
|
KvStore: kv,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
|
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
|
||||||
@@ -133,13 +156,13 @@ func newClient(opts options.StorageOptions,
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return resource.NewLocalResourceClient(server), nil
|
return resource.NewLocalResourceClient(server), nil, nil
|
||||||
|
|
||||||
case options.StorageTypeUnifiedGrpc:
|
case options.StorageTypeUnifiedGrpc:
|
||||||
if opts.Address == "" {
|
if opts.Address == "" {
|
||||||
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
|
return nil, nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -151,30 +174,44 @@ func newClient(opts options.StorageOptions,
|
|||||||
|
|
||||||
conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime)
|
conn, err = newGrpcConn(opts.Address, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.SearchServerAddress != "" {
|
if opts.SearchServerAddress != "" {
|
||||||
indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
indexConn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
indexConn = conn
|
indexConn = conn
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a resource client
|
// Create a resource client
|
||||||
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
||||||
|
return client, nil, err
|
||||||
|
|
||||||
default:
|
default:
|
||||||
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil)
|
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create DBProvider for SQL storage - this will be shared with migration service
|
||||||
|
dbProvider, err := dbimpl.ProvideResourceDB(infradb, cfg, tracer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// The concrete type implements both DBProvider and EngineProvider
|
||||||
|
engineProvider, ok := dbProvider.(db.EngineProvider)
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, fmt.Errorf("DBProvider does not implement EngineProvider")
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOptions := sql.ServerOptions{
|
serverOptions := sql.ServerOptions{
|
||||||
DB: db,
|
DB: infradb,
|
||||||
|
DBProvider: dbProvider,
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Tracer: tracer,
|
Tracer: tracer,
|
||||||
Reg: reg,
|
Reg: reg,
|
||||||
@@ -194,28 +231,28 @@ func newClient(opts options.StorageOptions,
|
|||||||
Logger: cfg.Logger,
|
Logger: cfg.Logger,
|
||||||
})
|
})
|
||||||
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
|
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
|
||||||
return nil, fmt.Errorf("failed to start queue: %w", err)
|
return nil, nil, fmt.Errorf("failed to start queue: %w", err)
|
||||||
}
|
}
|
||||||
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||||
NumWorkers: cfg.QOSNumberWorker,
|
NumWorkers: cfg.QOSNumberWorker,
|
||||||
Logger: cfg.Logger,
|
Logger: cfg.Logger,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create scheduler: %w", err)
|
return nil, nil, fmt.Errorf("failed to create scheduler: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = services.StartAndAwaitRunning(ctx, scheduler)
|
err = services.StartAndAwaitRunning(ctx, scheduler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to start scheduler: %w", err)
|
return nil, nil, fmt.Errorf("failed to start scheduler: %w", err)
|
||||||
}
|
}
|
||||||
serverOptions.QOSQueue = queue
|
serverOptions.QOSQueue = queue
|
||||||
}
|
}
|
||||||
|
|
||||||
server, err := sql.NewResourceServer(serverOptions)
|
server, err := sql.NewResourceServer(serverOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return resource.NewLocalResourceClient(server), nil
|
return resource.NewLocalResourceClient(server), engineProvider, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
|||||||
resourceServer.resetCalls()
|
resourceServer.resetCalls()
|
||||||
indexServer.resetCalls()
|
indexServer.resetCalls()
|
||||||
|
|
||||||
client, err := newClient(
|
client, _, err := newClient(
|
||||||
options.StorageOptions{
|
options.StorageOptions{
|
||||||
StorageType: options.StorageTypeUnifiedGrpc,
|
StorageType: options.StorageTypeUnifiedGrpc,
|
||||||
Address: resourceServerAddress,
|
Address: resourceServerAddress,
|
||||||
@@ -64,7 +64,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
|||||||
resourceServer.resetCalls()
|
resourceServer.resetCalls()
|
||||||
indexServer.resetCalls()
|
indexServer.resetCalls()
|
||||||
|
|
||||||
client, err := newClient(
|
client, _, err := newClient(
|
||||||
options.StorageOptions{
|
options.StorageOptions{
|
||||||
StorageType: options.StorageTypeUnifiedGrpc,
|
StorageType: options.StorageTypeUnifiedGrpc,
|
||||||
Address: resourceServerAddress,
|
Address: resourceServerAddress,
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
|
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/util/xorm"
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@@ -25,11 +26,12 @@ type Validator interface {
|
|||||||
// ResourceMigration handles migration of specific resource types from legacy to unified storage.
|
// ResourceMigration handles migration of specific resource types from legacy to unified storage.
|
||||||
type ResourceMigration struct {
|
type ResourceMigration struct {
|
||||||
migrator.MigrationBase
|
migrator.MigrationBase
|
||||||
migrator UnifiedMigrator
|
migrator UnifiedMigrator
|
||||||
resources []schema.GroupResource
|
resources []schema.GroupResource
|
||||||
migrationID string
|
migrationID string
|
||||||
validators []Validator // Optional: custom validation logic for this migration
|
validators []Validator // Optional: custom validation logic for this migration
|
||||||
log log.Logger
|
log log.Logger
|
||||||
|
legacyEngine *xorm.Engine
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResourceMigration creates a new migration for the specified resources.
|
// NewResourceMigration creates a new migration for the specified resources.
|
||||||
@@ -38,13 +40,15 @@ func NewResourceMigration(
|
|||||||
resources []schema.GroupResource,
|
resources []schema.GroupResource,
|
||||||
migrationID string,
|
migrationID string,
|
||||||
validators []Validator,
|
validators []Validator,
|
||||||
|
legacyEngine *xorm.Engine,
|
||||||
) *ResourceMigration {
|
) *ResourceMigration {
|
||||||
return &ResourceMigration{
|
return &ResourceMigration{
|
||||||
migrator: migrator,
|
migrator: migrator,
|
||||||
resources: resources,
|
resources: resources,
|
||||||
migrationID: migrationID,
|
migrationID: migrationID,
|
||||||
validators: validators,
|
validators: validators,
|
||||||
log: log.New("storage.unified.resource_migration." + migrationID),
|
legacyEngine: legacyEngine,
|
||||||
|
log: log.New("storage.unified.resource_migration." + migrationID),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,7 +63,7 @@ func (m *ResourceMigration) SQL(_ migrator.Dialect) string {
|
|||||||
func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
orgs, err := m.getAllOrgs(sess)
|
orgs, err := m.getAllOrgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.log.Error("failed to get organizations", "error", err)
|
m.log.Error("failed to get organizations", "error", err)
|
||||||
return fmt.Errorf("failed to get organizations: %w", err)
|
return fmt.Errorf("failed to get organizations: %w", err)
|
||||||
@@ -72,6 +76,17 @@ func (m *ResourceMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) erro
|
|||||||
|
|
||||||
m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources)
|
m.log.Info("Starting migration for all organizations", "org_count", len(orgs), "resources", m.resources)
|
||||||
|
|
||||||
|
if mg.Dialect.DriverName() == migrator.SQLite {
|
||||||
|
// reuse transaction in SQLite to avoid "database is locked" errors
|
||||||
|
tx, err := sess.Tx()
|
||||||
|
if err != nil {
|
||||||
|
m.log.Error("Failed to get transaction from session", "error", err)
|
||||||
|
return fmt.Errorf("failed to get transaction: %w", err)
|
||||||
|
}
|
||||||
|
ctx = resource.ContextWithTransaction(ctx, tx.Tx)
|
||||||
|
m.log.Info("Stored migrator transaction in context for bulk operations (SQLite compatibility)")
|
||||||
|
}
|
||||||
|
|
||||||
for _, org := range orgs {
|
for _, org := range orgs {
|
||||||
if err := m.migrateOrg(ctx, sess, org); err != nil {
|
if err := m.migrateOrg(ctx, sess, org); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -107,6 +122,10 @@ func (m *ResourceMigration) migrateOrg(ctx context.Context, sess *xorm.Session,
|
|||||||
m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime))
|
m.log.Error("Migration failed", "org_id", org.ID, "error", err, "duration", time.Since(startTime))
|
||||||
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err)
|
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, err)
|
||||||
}
|
}
|
||||||
|
if response.Error != nil {
|
||||||
|
m.log.Error("Migration reported error", "org_id", org.ID, "error", response.Error.String(), "duration", time.Since(startTime))
|
||||||
|
return fmt.Errorf("migration failed for org %d (%s): %w", org.ID, org.Name, fmt.Errorf("migration error: %s", response.Error.Message))
|
||||||
|
}
|
||||||
|
|
||||||
// Validate the migration results
|
// Validate the migration results
|
||||||
if err := m.validateMigration(migrationCtx, sess, response); err != nil {
|
if err := m.validateMigration(migrationCtx, sess, response); err != nil {
|
||||||
@@ -158,9 +177,9 @@ type orgInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getAllOrgs retrieves all organizations from the database
|
// getAllOrgs retrieves all organizations from the database
|
||||||
func (m *ResourceMigration) getAllOrgs(sess *xorm.Session) ([]orgInfo, error) {
|
func (m *ResourceMigration) getAllOrgs() ([]orgInfo, error) {
|
||||||
var orgs []orgInfo
|
var orgs []orgInfo
|
||||||
err := sess.Table("org").Cols("id", "name").Find(&orgs)
|
err := m.legacyEngine.Table("org").Cols("id", "name").Find(&orgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,8 +10,10 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
sqlstoremigrator "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/migrations/contract"
|
"github.com/grafana/grafana/pkg/storage/unified/migrations/contract"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@@ -21,11 +23,12 @@ var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/migrati
|
|||||||
var logger = log.New("storage.unified.migrations")
|
var logger = log.New("storage.unified.migrations")
|
||||||
|
|
||||||
type UnifiedStorageMigrationServiceImpl struct {
|
type UnifiedStorageMigrationServiceImpl struct {
|
||||||
migrator UnifiedMigrator
|
migrator UnifiedMigrator
|
||||||
cfg *setting.Cfg
|
cfg *setting.Cfg
|
||||||
sqlStore db.DB
|
sqlStore db.DB
|
||||||
kv kvstore.KVStore
|
kv kvstore.KVStore
|
||||||
client resource.ResourceClient
|
client *unified.ResourceClient
|
||||||
|
resourceDBEngine *xorm.Engine // For SQLite, use unified storage's engine for data migrations
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil)
|
var _ contract.UnifiedStorageMigrationService = (*UnifiedStorageMigrationServiceImpl)(nil)
|
||||||
@@ -36,14 +39,23 @@ func ProvideUnifiedStorageMigrationService(
|
|||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
sqlStore db.DB,
|
sqlStore db.DB,
|
||||||
kv kvstore.KVStore,
|
kv kvstore.KVStore,
|
||||||
client resource.ResourceClient,
|
client *unified.ResourceClient,
|
||||||
) contract.UnifiedStorageMigrationService {
|
) contract.UnifiedStorageMigrationService {
|
||||||
|
// Get engine from unified storage client, fallback to grafana core database engine
|
||||||
|
resourceEngine := client.GetEngine()
|
||||||
|
if resourceEngine != nil {
|
||||||
|
logger.Info("Using Resource DB for unified storage migrations")
|
||||||
|
} else {
|
||||||
|
logger.Info("Using SQL Store DB for unified storage migrations")
|
||||||
|
resourceEngine = sqlStore.GetEngine()
|
||||||
|
}
|
||||||
return &UnifiedStorageMigrationServiceImpl{
|
return &UnifiedStorageMigrationServiceImpl{
|
||||||
migrator: migrator,
|
migrator: migrator,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
sqlStore: sqlStore,
|
sqlStore: sqlStore,
|
||||||
kv: kv,
|
kv: kv,
|
||||||
client: client,
|
client: client,
|
||||||
|
resourceDBEngine: resourceEngine,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -61,7 +73,7 @@ func (p *UnifiedStorageMigrationServiceImpl) Run(ctx context.Context) error {
|
|||||||
|
|
||||||
// TODO: Re-enable once migrations are ready
|
// TODO: Re-enable once migrations are ready
|
||||||
// TODO: add guarantee that this only runs once
|
// TODO: add guarantee that this only runs once
|
||||||
// return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client)
|
//return RegisterMigrations(p.migrator, p.cfg, p.sqlStore, p.client, p.resourceDBEngine)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -70,10 +82,11 @@ func RegisterMigrations(
|
|||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
sqlStore db.DB,
|
sqlStore db.DB,
|
||||||
client resource.ResourceClient,
|
client resource.ResourceClient,
|
||||||
|
resourceDBEngine *xorm.Engine,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations")
|
ctx, span := tracer.Start(context.Background(), "storage.unified.RegisterMigrations")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
mg := sqlstoremigrator.NewScopedMigrator(sqlStore.GetEngine(), cfg, "unifiedstorage")
|
mg := sqlstoremigrator.NewScopedMigrator(resourceDBEngine, cfg, "unifiedstorage")
|
||||||
mg.AddCreateMigration()
|
mg.AddCreateMigration()
|
||||||
|
|
||||||
if err := prometheus.Register(mg); err != nil {
|
if err := prometheus.Register(mg); err != nil {
|
||||||
@@ -81,12 +94,17 @@ func RegisterMigrations(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Register resource migrations
|
// Register resource migrations
|
||||||
registerDashboardAndFolderMigration(mg, migrator, client)
|
registerDashboardAndFolderMigration(mg, sqlStore.GetEngine(), migrator, client)
|
||||||
|
|
||||||
// Run all registered migrations (blocking)
|
// Run all registered migrations (blocking)
|
||||||
sec := cfg.Raw.Section("database")
|
sec := cfg.Raw.Section("database")
|
||||||
|
migrationLocking := sec.Key("migration_locking").MustBool(true)
|
||||||
|
if mg.Dialect.DriverName() == sqlstoremigrator.SQLite {
|
||||||
|
// disable migration locking for SQLite to avoid "database is locked" errors in the bulk operations
|
||||||
|
migrationLocking = false
|
||||||
|
}
|
||||||
if err := mg.RunMigrations(ctx,
|
if err := mg.RunMigrations(ctx,
|
||||||
sec.Key("migration_locking").MustBool(true),
|
migrationLocking,
|
||||||
sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil {
|
sec.Key("locking_attempt_timeout_sec").MustInt()); err != nil {
|
||||||
return fmt.Errorf("unified storage data migration failed: %w", err)
|
return fmt.Errorf("unified storage data migration failed: %w", err)
|
||||||
}
|
}
|
||||||
@@ -95,15 +113,18 @@ func RegisterMigrations(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator UnifiedMigrator, client resource.ResourceClient) {
|
func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, legacyEngine *xorm.Engine, migrator UnifiedMigrator, client resource.ResourceClient) {
|
||||||
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
|
folders := schema.GroupResource{Group: "folder.grafana.app", Resource: "folders"}
|
||||||
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
|
dashboards := schema.GroupResource{Group: "dashboard.grafana.app", Resource: "dashboards"}
|
||||||
|
driverName := mg.Dialect.DriverName()
|
||||||
|
|
||||||
folderCountValidator := NewCountValidator(
|
folderCountValidator := NewCountValidator(
|
||||||
client,
|
client,
|
||||||
folders,
|
folders,
|
||||||
"dashboard",
|
"dashboard",
|
||||||
"org_id = ? and is_folder = true",
|
"org_id = ? and is_folder = true",
|
||||||
|
driverName,
|
||||||
|
legacyEngine,
|
||||||
)
|
)
|
||||||
|
|
||||||
dashboardCountValidator := NewCountValidator(
|
dashboardCountValidator := NewCountValidator(
|
||||||
@@ -111,15 +132,18 @@ func registerDashboardAndFolderMigration(mg *sqlstoremigrator.Migrator, migrator
|
|||||||
dashboards,
|
dashboards,
|
||||||
"dashboard",
|
"dashboard",
|
||||||
"org_id = ? and is_folder = false",
|
"org_id = ? and is_folder = false",
|
||||||
|
driverName,
|
||||||
|
legacyEngine,
|
||||||
)
|
)
|
||||||
|
|
||||||
folderTreeValidator := NewFolderTreeValidator(client, folders)
|
folderTreeValidator := NewFolderTreeValidator(client, folders, driverName, legacyEngine)
|
||||||
|
|
||||||
dashboardsAndFolders := NewResourceMigration(
|
dashboardsAndFolders := NewResourceMigration(
|
||||||
migrator,
|
migrator,
|
||||||
[]schema.GroupResource{folders, dashboards},
|
[]schema.GroupResource{folders, dashboards},
|
||||||
"folders-dashboards",
|
"folders-dashboards",
|
||||||
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
|
[]Validator{folderCountValidator, dashboardCountValidator, folderTreeValidator},
|
||||||
|
legacyEngine,
|
||||||
)
|
)
|
||||||
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
|
mg.AddMigration("folders and dashboards migration", dashboardsAndFolders)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/util/xorm"
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@@ -52,11 +53,13 @@ func filterResponse(response *resourcepb.BulkResponse, resources []schema.GroupR
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CountValidator struct {
|
type CountValidator struct {
|
||||||
name string
|
name string
|
||||||
client resourcepb.ResourceIndexClient
|
client resourcepb.ResourceIndexClient
|
||||||
resource schema.GroupResource
|
resource schema.GroupResource
|
||||||
table string
|
table string
|
||||||
whereClause string
|
whereClause string
|
||||||
|
driverName string
|
||||||
|
legacyEngine *xorm.Engine
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCountValidator(
|
func NewCountValidator(
|
||||||
@@ -64,13 +67,17 @@ func NewCountValidator(
|
|||||||
resource schema.GroupResource,
|
resource schema.GroupResource,
|
||||||
table string,
|
table string,
|
||||||
whereClause string,
|
whereClause string,
|
||||||
|
driverName string,
|
||||||
|
legacy *xorm.Engine,
|
||||||
) Validator {
|
) Validator {
|
||||||
return &CountValidator{
|
return &CountValidator{
|
||||||
name: "CountValidator",
|
name: "CountValidator",
|
||||||
client: client,
|
client: client,
|
||||||
resource: resource,
|
resource: resource,
|
||||||
table: table,
|
table: table,
|
||||||
whereClause: whereClause,
|
whereClause: whereClause,
|
||||||
|
driverName: driverName,
|
||||||
|
legacyEngine: legacy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,27 +122,37 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
|
|||||||
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
return fmt.Errorf("invalid namespace %s: %w", summary.Namespace, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
legacyCount, err := sess.Table(v.table).Where(v.whereClause, orgID).Count()
|
legacyCount, err := v.legacyEngine.Table(v.table).Where(v.whereClause, orgID).Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to count %s: %w", v.table, err)
|
return fmt.Errorf("failed to count %s: %w", v.table, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get unified storage count using GetStats API
|
|
||||||
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
|
||||||
Namespace: summary.Namespace,
|
|
||||||
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
|
||||||
summary.Group, summary.Resource, summary.Namespace, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the count for this specific resource type
|
|
||||||
var unifiedCount int64
|
var unifiedCount int64
|
||||||
for _, stat := range statsResp.Stats {
|
if v.driverName == migrator.SQLite {
|
||||||
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
unifiedCount, err = sess.Table("resource").
|
||||||
unifiedCount = stat.Count
|
Where("namespace = ? AND `group` = ? AND resource = ?",
|
||||||
break
|
summary.Namespace, summary.Group, summary.Resource).
|
||||||
|
Count()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to count resource table for %s/%s in namespace %s: %w",
|
||||||
|
summary.Group, summary.Resource, summary.Namespace, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Get unified storage count using GetStats API
|
||||||
|
statsResp, err := v.client.GetStats(ctx, &resourcepb.ResourceStatsRequest{
|
||||||
|
Namespace: summary.Namespace,
|
||||||
|
Kinds: []string{fmt.Sprintf("%s/%s", summary.Group, summary.Resource)},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get stats for %s/%s in namespace %s: %w",
|
||||||
|
summary.Group, summary.Resource, summary.Namespace, err)
|
||||||
|
}
|
||||||
|
// Find the count for this specific resource type
|
||||||
|
for _, stat := range statsResp.Stats {
|
||||||
|
if stat.Group == summary.Group && stat.Resource == summary.Resource {
|
||||||
|
unifiedCount = stat.Count
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,19 +179,25 @@ func (v *CountValidator) Validate(ctx context.Context, sess *xorm.Session, respo
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FolderTreeValidator struct {
|
type FolderTreeValidator struct {
|
||||||
name string
|
name string
|
||||||
client resourcepb.ResourceIndexClient
|
client resourcepb.ResourceIndexClient
|
||||||
resource schema.GroupResource
|
resource schema.GroupResource
|
||||||
|
driverName string
|
||||||
|
legacyEngine *xorm.Engine
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFolderTreeValidator(
|
func NewFolderTreeValidator(
|
||||||
client resourcepb.ResourceIndexClient,
|
client resourcepb.ResourceIndexClient,
|
||||||
resource schema.GroupResource,
|
resource schema.GroupResource,
|
||||||
|
driverName string,
|
||||||
|
legacyEngine *xorm.Engine,
|
||||||
) Validator {
|
) Validator {
|
||||||
return &FolderTreeValidator{
|
return &FolderTreeValidator{
|
||||||
name: "FolderTreeValidator",
|
name: "FolderTreeValidator",
|
||||||
client: client,
|
client: client,
|
||||||
resource: resource,
|
resource: resource,
|
||||||
|
driverName: driverName,
|
||||||
|
legacyEngine: legacyEngine,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,6 +208,12 @@ type legacyFolder struct {
|
|||||||
Title string `xorm:"title"`
|
Title string `xorm:"title"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type unifiedFolder struct {
|
||||||
|
GUID string `xorm:"guid"`
|
||||||
|
Name string `xorm:"name"`
|
||||||
|
Folder string `xorm:"folder"`
|
||||||
|
}
|
||||||
|
|
||||||
func (v *FolderTreeValidator) Name() string {
|
func (v *FolderTreeValidator) Name() string {
|
||||||
return v.name
|
return v.name
|
||||||
}
|
}
|
||||||
@@ -212,13 +241,18 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build legacy folder parent map
|
// Build legacy folder parent map
|
||||||
legacyParentMap, err := v.buildLegacyFolderParentMap(sess, orgID, log)
|
legacyParentMap, err := v.buildLegacyFolderParentMap(orgID, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to build legacy folder parent map: %w", err)
|
return fmt.Errorf("failed to build legacy folder parent map: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build unified storage folder parent map
|
// Build unified storage folder parent map
|
||||||
unifiedParentMap, err := v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
|
var unifiedParentMap map[string]string
|
||||||
|
if v.driverName == migrator.SQLite {
|
||||||
|
unifiedParentMap, err = v.buildUnifiedFolderParentMapSQLite(sess, summary.Namespace, log)
|
||||||
|
} else {
|
||||||
|
unifiedParentMap, err = v.buildUnifiedFolderParentMap(ctx, summary.Namespace, log)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to build unified folder parent map: %w", err)
|
return fmt.Errorf("failed to build unified folder parent map: %w", err)
|
||||||
}
|
}
|
||||||
@@ -270,10 +304,10 @@ func (v *FolderTreeValidator) Validate(ctx context.Context, sess *xorm.Session,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *FolderTreeValidator) buildLegacyFolderParentMap(sess *xorm.Session, orgID int64, log log.Logger) (map[string]string, error) {
|
func (v *FolderTreeValidator) buildLegacyFolderParentMap(orgID int64, log log.Logger) (map[string]string, error) {
|
||||||
// Query all folders for this org
|
// Query all folders for this org
|
||||||
var folders []legacyFolder
|
var folders []legacyFolder
|
||||||
err := sess.Table("dashboard").
|
err := v.legacyEngine.Table("dashboard").
|
||||||
Cols("id", "uid", "folder_uid", "title").
|
Cols("id", "uid", "folder_uid", "title").
|
||||||
Where("org_id = ? AND is_folder = ?", orgID, true).
|
Where("org_id = ? AND is_folder = ?", orgID, true).
|
||||||
Find(&folders)
|
Find(&folders)
|
||||||
@@ -348,3 +382,30 @@ func (v *FolderTreeValidator) buildUnifiedFolderParentMap(ctx context.Context, n
|
|||||||
|
|
||||||
return parentMap, nil
|
return parentMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *FolderTreeValidator) buildUnifiedFolderParentMapSQLite(sess *xorm.Session, namespace string, log log.Logger) (map[string]string, error) {
|
||||||
|
var folders []unifiedFolder
|
||||||
|
err := sess.Table("resource").
|
||||||
|
Cols("guid", "name", "folder").
|
||||||
|
Where("namespace = ? AND resource = ?", namespace, "folder").
|
||||||
|
Find(&folders)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to query unified folders: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parentMap := make(map[string]string)
|
||||||
|
for _, folder := range folders {
|
||||||
|
parentMap[folder.Name] = folder.Folder
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parentMap) == 0 {
|
||||||
|
log.Debug("No unified folders found for namespace", "namespace", namespace)
|
||||||
|
return make(map[string]string), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Built unified folder parent map",
|
||||||
|
"folder_count", len(parentMap),
|
||||||
|
"namespace", namespace)
|
||||||
|
|
||||||
|
return parentMap, nil
|
||||||
|
}
|
||||||
|
|||||||
25
pkg/storage/unified/resource/transaction.go
Normal file
25
pkg/storage/unified/resource/transaction.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package resource
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
)
|
||||||
|
|
||||||
|
type transactionContextKey struct{}
|
||||||
|
|
||||||
|
// ContextWithTransaction returns a new context with the transaction stored directly.
|
||||||
|
// This is used for SQLite migrations where the transaction needs to be shared
|
||||||
|
// between the migration code and unified storage operations within the same process.
|
||||||
|
func ContextWithTransaction(ctx context.Context, tx *sql.Tx) context.Context {
|
||||||
|
return context.WithValue(ctx, transactionContextKey{}, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionFromContext retrieves the transaction from context
|
||||||
|
func TransactionFromContext(ctx context.Context) *sql.Tx {
|
||||||
|
if v := ctx.Value(transactionContextKey{}); v != nil {
|
||||||
|
if tx, ok := v.(*sql.Tx); ok {
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@@ -20,6 +21,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
|
||||||
)
|
)
|
||||||
@@ -111,6 +113,19 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
|
|||||||
}
|
}
|
||||||
defer b.bulkLock.Finish(setting.Collection)
|
defer b.bulkLock.Finish(setting.Collection)
|
||||||
|
|
||||||
|
// If provided, reuse the inproc transaction for SQLite
|
||||||
|
if clientCtx := inprocgrpc.ClientContext(ctx); clientCtx != nil && b.dialect.DialectName() == "sqlite" {
|
||||||
|
if externalTx := resource.TransactionFromContext(clientCtx); externalTx != nil {
|
||||||
|
b.log.Info("Using SQLite transaction from client context")
|
||||||
|
rsp := &resourcepb.BulkResponse{}
|
||||||
|
err := b.processBulkWithTx(ctx, dbimpl.NewTx(externalTx), setting, iter, rsp)
|
||||||
|
if err != nil {
|
||||||
|
rsp.Error = resource.AsErrorResult(err)
|
||||||
|
}
|
||||||
|
return rsp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We may want to first write parquet, then read parquet
|
// We may want to first write parquet, then read parquet
|
||||||
if b.dialect.DialectName() == "sqlite" {
|
if b.dialect.DialectName() == "sqlite" {
|
||||||
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
|
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
|
||||||
@@ -151,109 +166,134 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
|
|||||||
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
|
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
|
||||||
rsp := &resourcepb.BulkResponse{}
|
rsp := &resourcepb.BulkResponse{}
|
||||||
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
|
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
|
||||||
rollbackWithError := func(err error) error {
|
return b.processBulkWithTx(ctx, tx, setting, iter, rsp)
|
||||||
txerr := tx.Rollback()
|
})
|
||||||
if txerr != nil {
|
if err != nil {
|
||||||
b.log.Warn("rollback", "error", txerr)
|
rsp.Error = resource.AsErrorResult(err)
|
||||||
} else {
|
}
|
||||||
b.log.Info("rollback")
|
return rsp
|
||||||
|
}
|
||||||
|
|
||||||
|
// processBulkWithTx performs the bulk operation using the provided transaction.
|
||||||
|
// This is used both when creating our own transaction and when reusing an external one.
|
||||||
|
func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resource.BulkSettings, iter resource.BulkRequestIterator, rsp *resourcepb.BulkResponse) error {
|
||||||
|
rollbackWithError := func(err error) error {
|
||||||
|
txerr := tx.Rollback()
|
||||||
|
if txerr != nil {
|
||||||
|
b.log.Warn("rollback", "error", txerr)
|
||||||
|
} else {
|
||||||
|
b.log.Info("rollback")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bulk := &bulkWroker{
|
||||||
|
ctx: ctx,
|
||||||
|
tx: tx,
|
||||||
|
dialect: b.dialect,
|
||||||
|
logger: logging.FromContext(ctx),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the RV based on incoming request timestamps
|
||||||
|
rv := newBulkRV()
|
||||||
|
|
||||||
|
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
|
||||||
|
|
||||||
|
// First clear everything in the transaction
|
||||||
|
if setting.RebuildCollection {
|
||||||
|
for _, key := range setting.Collection {
|
||||||
|
summary, err := bulk.deleteCollection(key)
|
||||||
|
if err != nil {
|
||||||
|
return rollbackWithError(err)
|
||||||
}
|
}
|
||||||
|
summaries[resource.NSGR(key)] = summary
|
||||||
|
rsp.Summary = append(rsp.Summary, summary)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, key := range setting.Collection {
|
||||||
|
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
||||||
|
Namespace: key.Namespace,
|
||||||
|
Group: key.Group,
|
||||||
|
Resource: key.Resource,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := &unstructured.Unstructured{}
|
||||||
|
|
||||||
|
// Write each event into the history
|
||||||
|
for iter.Next() {
|
||||||
|
if iter.RollbackRequested() {
|
||||||
|
return rollbackWithError(nil)
|
||||||
|
}
|
||||||
|
req := iter.Request()
|
||||||
|
if req == nil {
|
||||||
|
return rollbackWithError(fmt.Errorf("missing request"))
|
||||||
|
}
|
||||||
|
rsp.Processed++
|
||||||
|
|
||||||
|
if req.Action == resourcepb.BulkRequest_UNKNOWN {
|
||||||
|
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||||
|
Key: req.Key,
|
||||||
|
Action: req.Action,
|
||||||
|
Error: "unknown action",
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := obj.UnmarshalJSON(req.Value)
|
||||||
|
if err != nil {
|
||||||
|
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
||||||
|
Key: req.Key,
|
||||||
|
Action: req.Action,
|
||||||
|
Error: "unable to unmarshal json",
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the event to history
|
||||||
|
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
|
||||||
|
SQLTemplate: sqltemplate.New(b.dialect),
|
||||||
|
WriteEvent: resource.WriteEvent{
|
||||||
|
Key: req.Key,
|
||||||
|
Type: resourcepb.WatchEvent_Type(req.Action),
|
||||||
|
Value: req.Value,
|
||||||
|
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
|
||||||
|
},
|
||||||
|
Folder: req.Folder,
|
||||||
|
GUID: uuid.New().String(),
|
||||||
|
ResourceVersion: rv.next(obj),
|
||||||
|
}); err != nil {
|
||||||
|
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now update the resource table from history
|
||||||
|
for _, key := range setting.Collection {
|
||||||
|
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
|
||||||
|
summary := summaries[k]
|
||||||
|
if summary == nil {
|
||||||
|
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := bulk.syncCollection(key, summary)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bulk := &bulkWroker{
|
|
||||||
ctx: ctx,
|
|
||||||
tx: tx,
|
|
||||||
dialect: b.dialect,
|
|
||||||
logger: logging.FromContext(ctx),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate the RV based on incoming request timestamps
|
if b.dialect.DialectName() == "sqlite" {
|
||||||
rv := newBulkRV()
|
nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource)
|
||||||
|
if err != nil {
|
||||||
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
|
b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key))
|
||||||
|
} else {
|
||||||
// First clear everything in the transaction
|
b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key))
|
||||||
if setting.RebuildCollection {
|
// Save the incremented RV
|
||||||
for _, key := range setting.Collection {
|
if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil {
|
||||||
summary, err := bulk.deleteCollection(key)
|
b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key))
|
||||||
if err != nil {
|
} else {
|
||||||
return rollbackWithError(err)
|
b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key))
|
||||||
}
|
}
|
||||||
summaries[resource.NSGR(key)] = summary
|
|
||||||
rsp.Summary = append(rsp.Summary, summary)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, key := range setting.Collection {
|
|
||||||
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
|
|
||||||
Namespace: key.Namespace,
|
|
||||||
Group: key.Group,
|
|
||||||
Resource: key.Resource,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := &unstructured.Unstructured{}
|
|
||||||
|
|
||||||
// Write each event into the history
|
|
||||||
for iter.Next() {
|
|
||||||
if iter.RollbackRequested() {
|
|
||||||
return rollbackWithError(nil)
|
|
||||||
}
|
|
||||||
req := iter.Request()
|
|
||||||
if req == nil {
|
|
||||||
return rollbackWithError(fmt.Errorf("missing request"))
|
|
||||||
}
|
|
||||||
rsp.Processed++
|
|
||||||
|
|
||||||
if req.Action == resourcepb.BulkRequest_UNKNOWN {
|
|
||||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
|
||||||
Key: req.Key,
|
|
||||||
Action: req.Action,
|
|
||||||
Error: "unknown action",
|
|
||||||
})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err := obj.UnmarshalJSON(req.Value)
|
|
||||||
if err != nil {
|
|
||||||
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
|
|
||||||
Key: req.Key,
|
|
||||||
Action: req.Action,
|
|
||||||
Error: "unable to unmarshal json",
|
|
||||||
})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the event to history
|
|
||||||
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
|
|
||||||
SQLTemplate: sqltemplate.New(b.dialect),
|
|
||||||
WriteEvent: resource.WriteEvent{
|
|
||||||
Key: req.Key,
|
|
||||||
Type: resourcepb.WatchEvent_Type(req.Action),
|
|
||||||
Value: req.Value,
|
|
||||||
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
|
|
||||||
},
|
|
||||||
Folder: req.Folder,
|
|
||||||
GUID: uuid.New().String(),
|
|
||||||
ResourceVersion: rv.next(obj),
|
|
||||||
}); err != nil {
|
|
||||||
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now update the resource table from history
|
|
||||||
for _, key := range setting.Collection {
|
|
||||||
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
|
|
||||||
summary := summaries[k]
|
|
||||||
if summary == nil {
|
|
||||||
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
|
|
||||||
}
|
|
||||||
|
|
||||||
err := bulk.syncCollection(key, summary)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the collection RV is above our last written event
|
// Make sure the collection RV is above our last written event
|
||||||
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
|
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
|
||||||
return "", nil
|
return "", nil
|
||||||
@@ -261,19 +301,15 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
b.log.Warn("error increasing RV", "error", err)
|
b.log.Warn("error increasing RV", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the last import time. This is important to trigger reindexing
|
|
||||||
// of the resource for a given namespace.
|
|
||||||
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
|
|
||||||
return rollbackWithError(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
})
|
// Update the last import time. This is important to trigger reindexing
|
||||||
if err != nil {
|
// of the resource for a given namespace.
|
||||||
rsp.Error = resource.AsErrorResult(err)
|
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
|
||||||
|
return rollbackWithError(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rsp
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {
|
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {
|
||||||
|
|||||||
@@ -48,6 +48,11 @@ type sqlTx struct {
|
|||||||
*sql.Tx
|
*sql.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTx wraps an existing *sql.Tx with sqlTx
|
||||||
|
func NewTx(tx *sql.Tx) db.Tx {
|
||||||
|
return sqlTx{tx}
|
||||||
|
}
|
||||||
|
|
||||||
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
|
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
|
||||||
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
|
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
|
||||||
// and the parameters are passed as arguments."
|
// and the parameters are passed as arguments."
|
||||||
|
|||||||
@@ -103,6 +103,11 @@ func (p *resourceDBProvider) Init(ctx context.Context) (db.DB, error) {
|
|||||||
return p.resourceDB, p.initErr
|
return p.resourceDB, p.initErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetEngine returns the underlying xorm.Engine for data migrations.
|
||||||
|
func (p *resourceDBProvider) GetEngine() *xorm.Engine {
|
||||||
|
return p.engine
|
||||||
|
}
|
||||||
|
|
||||||
func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) {
|
func (p *resourceDBProvider) initDB(ctx context.Context) (db.DB, error) {
|
||||||
p.log.Info("Initializing Resource DB",
|
p.log.Info("Initializing Resource DB",
|
||||||
"db_type",
|
"db_type",
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/util/xorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate mockery --with-expecter --name DB
|
//go:generate mockery --with-expecter --name DB
|
||||||
@@ -19,6 +21,13 @@ type DBProvider interface {
|
|||||||
Init(context.Context) (DB, error)
|
Init(context.Context) (DB, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EngineProvider provides access to the underlying xorm.Engine.
|
||||||
|
type EngineProvider interface {
|
||||||
|
// GetEngine returns the underlying xorm.Engine.
|
||||||
|
// It can be used to run data migrations to unified storage.
|
||||||
|
GetEngine() *xorm.Engine
|
||||||
|
}
|
||||||
|
|
||||||
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
|
// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit
|
||||||
// testing. We purposefully hide database operation methods that would use
|
// testing. We purposefully hide database operation methods that would use
|
||||||
// context.Background().
|
// context.Background().
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -32,6 +33,7 @@ type QOSEnqueueDequeuer interface {
|
|||||||
type ServerOptions struct {
|
type ServerOptions struct {
|
||||||
Backend resource.StorageBackend
|
Backend resource.StorageBackend
|
||||||
DB infraDB.DB
|
DB infraDB.DB
|
||||||
|
DBProvider db.DBProvider // If provided, use this instead of creating a new one
|
||||||
Cfg *setting.Cfg
|
Cfg *setting.Cfg
|
||||||
Tracer trace.Tracer
|
Tracer trace.Tracer
|
||||||
Reg prometheus.Registerer
|
Reg prometheus.Registerer
|
||||||
@@ -91,9 +93,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
serverOptions.Backend = opts.Backend
|
serverOptions.Backend = opts.Backend
|
||||||
// TODO: we should probably have a proper interface for diagnostics/lifecycle
|
// TODO: we should probably have a proper interface for diagnostics/lifecycle
|
||||||
} else {
|
} else {
|
||||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
// Use provided DBProvider if available, otherwise create a new one
|
||||||
if err != nil {
|
eDB := opts.DBProvider
|
||||||
return nil, err
|
if eDB == nil {
|
||||||
|
var err error
|
||||||
|
eDB, err = dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ package xorm
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"reflect"
|
"reflect"
|
||||||
@@ -43,7 +44,7 @@ type Session struct {
|
|||||||
afterProcessors []executedProcessor
|
afterProcessors []executedProcessor
|
||||||
|
|
||||||
prepareStmt bool
|
prepareStmt bool
|
||||||
stmtCache map[uint32]*core.Stmt //key: hash.Hash32 of (queryStr, len(queryStr))
|
stmtCache map[uint32]*core.Stmt // key: hash.Hash32 of (queryStr, len(queryStr))
|
||||||
|
|
||||||
// !evalphobia! stored the last executed query on this session
|
// !evalphobia! stored the last executed query on this session
|
||||||
lastSQL string
|
lastSQL string
|
||||||
@@ -236,6 +237,14 @@ func (session *Session) DB() *core.DB {
|
|||||||
return session.db
|
return session.db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tx returns the underlying transaction
|
||||||
|
func (session *Session) Tx() (*core.Tx, error) {
|
||||||
|
if session.tx == nil {
|
||||||
|
return nil, errors.New("no open transaction")
|
||||||
|
}
|
||||||
|
return session.tx, nil
|
||||||
|
}
|
||||||
|
|
||||||
func cleanupProcessorsClosures(slices *[]func(any)) {
|
func cleanupProcessorsClosures(slices *[]func(any)) {
|
||||||
if len(*slices) > 0 {
|
if len(*slices) > 0 {
|
||||||
*slices = make([]func(any), 0)
|
*slices = make([]func(any), 0)
|
||||||
|
|||||||
Reference in New Issue
Block a user