Compare commits

...

10 Commits

Author SHA1 Message Date
Rafael Paulovic
09b493ecd2 chore: fix some tests 2026-01-15 17:36:24 +01:00
Rafael Paulovic
0f73ba4d40 chore: move separation up to module_server.go
- support old target as before until we change search deployment
2026-01-15 15:04:46 +01:00
Rafael Paulovic
c8da64e4eb chore: separate resource service into search and storage 2026-01-15 00:49:08 +01:00
Rafael Paulovic
2fab497c18 chore: use only needed methods in storage interface
- continue cleanup and separation
2026-01-14 21:14:24 +01:00
Rafael Paulovic
b0bb71f834 fix: wire 2026-01-14 19:23:10 +01:00
mayor
a7aa55f908 Add search-server target and configurable search mode
- Add SearchServer module target for standalone search service
- Add search_mode config: "", "embedded" (default), "remote"
- Add search_server_address config for remote search server
- Create remote_search.go: gRPC client wrapper for remote search
- Create search_service.go: standalone search gRPC service
- Modify service.go: conditional search mode handling
- Backward compatible: empty/embedded mode works as before

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 17:49:03 +01:00
mayor
0a61846b5e Initialize backend before search server
The backend's Init() must be called before search.Init() because
buildIndexes() calls storage.GetResourceStats() which requires
the database connection to be established.

Previously, backend.Init() was called by ResourceServer.Init()
through the Lifecycle hooks, but now search.Init() runs before
ResourceServer is created.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:58:30 +01:00
mayor
785cc739ee Fix nil pointer in NewSearchServer - use serverOptions.Backend
The backend may be created inside the function and assigned to
serverOptions.Backend, not opts.Backend. Using opts.Backend caused
a nil pointer dereference in searchSupport.buildIndexes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:22:42 +01:00
mayor
4913baaf04 Fix compilation errors from SearchServer extraction
- Update NewLocalResourceClient to accept SearchServer parameter
- Add SearchClient methods to ResourceClient interface (client needs both)
- Fix directResourceClient to implement new interface with stub methods
- Update search_and_storage.go test to create SearchServer separately

This continues the work of extracting SearchServer from ResourceServer
to enable independent usage by storage.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 17:59:46 +01:00
Peter Štibraný
c8f1efe7c7 Initial prototype extracting SearchServer from ResourceServer. 2026-01-12 17:50:39 +01:00
27 changed files with 1054 additions and 305 deletions

View File

@@ -74,7 +74,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
return err
}
grpcClient, err := newUnifiedClient(cfg, sqlStore, featureToggles)
grpcClient, err := newUnifiedMigratorClient(cfg, sqlStore, featureToggles)
if err != nil {
return err
}
@@ -92,7 +92,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start)
}
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient)
opts.WithHistory = true // always include history in non-interactive mode
@@ -109,7 +109,7 @@ func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions,
return nil
}
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace))
if err != nil {
return err
@@ -225,7 +225,7 @@ func promptYesNo(prompt string) (bool, error) {
}
}
func newUnifiedClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.ResourceClient, error) {
func newUnifiedMigratorClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.MigratorClient, error) {
return unified.ProvideUnifiedStorageClient(&unified.Options{
Cfg: cfg,
Features: featureToggles,

View File

@@ -10,6 +10,7 @@ const (
SearchServerRing string = "search-server-ring"
SearchServerDistributor string = "search-server-distributor"
StorageServer string = "storage-server"
SearchServer string = "search-server"
ZanzanaServer string = "zanzana-server"
InstrumentationServer string = "instrumentation-server"
FrontendServer string = "frontend-server"
@@ -17,10 +18,12 @@ const (
)
var dependencyMap = map[string][]string{
MemberlistKV: {InstrumentationServer},
SearchServerRing: {InstrumentationServer, MemberlistKV},
GrafanaAPIServer: {InstrumentationServer},
MemberlistKV: {InstrumentationServer},
SearchServerRing: {InstrumentationServer, MemberlistKV},
GrafanaAPIServer: {InstrumentationServer},
// TODO: remove SearchServerRing once split search is done
StorageServer: {InstrumentationServer, SearchServerRing},
SearchServer: {InstrumentationServer, SearchServerRing},
ZanzanaServer: {InstrumentationServer},
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
Core: {},

View File

@@ -11,91 +11,59 @@ import (
)
var (
_ resource.ResourceClient = (*directResourceClient)(nil)
_ resource.StorageClient = (*DirectStorageClient)(nil)
)
// The direct client passes requests directly to the server using the *same* context
func NewDirectResourceClient(server resource.ResourceServer) resource.ResourceClient {
return &directResourceClient{server}
// NewDirectStorageClient creates a client that passes requests directly to the server using the *same* context
func NewDirectStorageClient(server resource.ResourceServer) *DirectStorageClient {
return &DirectStorageClient{server}
}
type directResourceClient struct {
type DirectStorageClient struct {
server resource.ResourceServer
}
// Create implements ResourceClient.
func (d *directResourceClient) Create(ctx context.Context, in *resourcepb.CreateRequest, opts ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
func (d *DirectStorageClient) Create(ctx context.Context, in *resourcepb.CreateRequest, _ ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
return d.server.Create(ctx, in)
}
// Delete implements ResourceClient.
func (d *directResourceClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, opts ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
func (d *DirectStorageClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, _ ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
return d.server.Delete(ctx, in)
}
// GetBlob implements ResourceClient.
func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, opts ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
func (d *DirectStorageClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, _ ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
return d.server.GetBlob(ctx, in)
}
// GetStats implements ResourceClient.
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
}
// IsHealthy implements ResourceClient.
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
func (d *DirectStorageClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, _ ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
return d.server.IsHealthy(ctx, in)
}
// List implements ResourceClient.
func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequest, opts ...grpc.CallOption) (*resourcepb.ListResponse, error) {
func (d *DirectStorageClient) List(ctx context.Context, in *resourcepb.ListRequest, _ ...grpc.CallOption) (*resourcepb.ListResponse, error) {
return d.server.List(ctx, in)
}
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
}
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
}
// PutBlob implements ResourceClient.
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
func (d *DirectStorageClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, _ ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
return d.server.PutBlob(ctx, in)
}
// Read implements ResourceClient.
func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequest, opts ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
func (d *DirectStorageClient) Read(ctx context.Context, in *resourcepb.ReadRequest, _ ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
return d.server.Read(ctx, in)
}
// Search implements ResourceClient.
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
}
// Update implements ResourceClient.
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
func (d *DirectStorageClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, _ ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
return d.server.Update(ctx, in)
}
// Watch implements ResourceClient.
func (d *directResourceClient) Watch(ctx context.Context, in *resourcepb.WatchRequest, opts ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
func (d *DirectStorageClient) Watch(_ context.Context, _ *resourcepb.WatchRequest, _ ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
return nil, fmt.Errorf("watch not supported with direct resource client")
}
// BulkProcess implements resource.ResourceClient.
func (d *directResourceClient) BulkProcess(ctx context.Context, opts ...grpc.CallOption) (resourcepb.BulkStore_BulkProcessClient, error) {
return nil, fmt.Errorf("BulkProcess not supported with direct resource client")
}
// RebuildIndexes implements resource.ResourceClient.
func (b *directResourceClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest, opts ...grpc.CallOption) (*resourcepb.RebuildIndexesResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (b *directResourceClient) GetQuotaUsage(ctx context.Context, req *resourcepb.QuotaUsageRequest, opts ...grpc.CallOption) (*resourcepb.QuotaUsageResponse, error) {
return nil, fmt.Errorf("not implemented")
}

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
@@ -40,7 +41,7 @@ func (s *DashboardStorage) NewStore(dash utils.ResourceInfo, scheme *runtime.Sch
if err != nil {
return nil, err
}
client := legacy.NewDirectResourceClient(server) // same context
client := legacy.NewDirectStorageClient(server) // same context
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil,
defaultOpts.StorageConfig.Config, nil,
)

View File

@@ -37,7 +37,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client,
) *FolderAPIBuilder {
builder := &FolderAPIBuilder{
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder
}
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{
features: features,
accessClient: ac,

View File

@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -198,11 +198,23 @@ func (s *ModuleServer) Run() error {
//}
m.RegisterModule(modules.StorageServer, func() (services.Service, error) {
// TODO: remove when we change deployment to use search server target
if s.cfg.EnableSearch {
docBuilders, err := InitializeDocumentBuilders(s.cfg)
if err != nil {
return nil, err
}
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
}
return sql.ProvideStorageService(s.cfg, s.features, nil, s.log, s.registerer, s.storageMetrics, s.storageBackend)
})
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
docBuilders, err := InitializeDocumentBuilders(s.cfg)
if err != nil {
return nil, err
}
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
return sql.ProvideUnifiedSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend, s.httpServerRouter)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {

View File

@@ -372,7 +372,7 @@ func initModuleServerForTest(
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
}
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer {
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.SearchServer {
cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database")
require.NoError(t, err)
@@ -391,17 +391,16 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
require.NoError(t, err)
searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil)
require.NoError(t, err)
server, err := sql.NewResourceServer(sql.ServerOptions{
DB: nil,
Cfg: cfg,
Tracer: tracer,
Reg: nil,
AccessClient: nil,
SearchOptions: searchOpts,
StorageMetrics: nil,
IndexMetrics: nil,
Features: features,
QOSQueue: nil,
searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
Cfg: cfg,
Tracer: tracer,
SearchOptions: searchOpts,
})
require.NoError(t, err)
storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
Cfg: cfg,
Tracer: tracer,
Features: features,
})
require.NoError(t, err)
@@ -417,12 +416,12 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
for _, ns := range testNamespaces {
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
_, err = server.Create(ctx, generatePlaylistPayload(ns))
_, err = storageServer.Create(ctx, generatePlaylistPayload(ns))
require.NoError(t, err)
}
}
return server
return searchServer
}
var counter int

23
pkg/server/wire_gen.go generated
View File

@@ -513,6 +513,11 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
if err != nil {
return nil, err
}
storageMetrics := resource.ProvideStorageMetrics(registerer)
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil {
return nil, err
}
options := &unified.Options{
Cfg: cfg,
Features: featureToggles,
@@ -522,8 +527,8 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
Authzc: accessClient,
Docs: documentBuilderSupplier,
SecureValues: inlineSecureValueSupport,
Backend: storageBackend,
}
storageMetrics := resource.ProvideStorageMetrics(registerer)
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
if err != nil {
@@ -1173,6 +1178,11 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
if err != nil {
return nil, err
}
storageMetrics := resource.ProvideStorageMetrics(registerer)
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil {
return nil, err
}
options := &unified.Options{
Cfg: cfg,
Features: featureToggles,
@@ -1182,8 +1192,8 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
Authzc: accessClient,
Docs: documentBuilderSupplier,
SecureValues: inlineSecureValueSupport,
Backend: storageBackend,
}
storageMetrics := resource.ProvideStorageMetrics(registerer)
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
if err != nil {
@@ -1748,7 +1758,14 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp
hooksService := hooks.ProvideService()
ossLicensingService := licensing.ProvideService(cfg, hooksService)
moduleRegisterer := ProvideNoopModuleRegisterer()
storageBackend, err := sql.ProvideStorageBackend(cfg)
ossMigrations := migrations.ProvideOSSMigrations(featureToggles)
inProcBus := bus.ProvideBus(tracingService)
sqlStore, err := sqlstore.ProvideService(cfg, featureToggles, ossMigrations, inProcBus, tracingService)
if err != nil {
return nil, err
}
tracer := otelTracer()
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil {
return nil, err
}

View File

@@ -6,6 +6,9 @@ package server
import (
"github.com/google/wire"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/configprovider"
"github.com/grafana/grafana/pkg/infra/metrics"
@@ -65,6 +68,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/grafana/pkg/storage/unified/sql"
@@ -145,8 +149,10 @@ var wireExtsBasicSet = wire.NewSet(
sandbox.ProvideService,
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
wire.Struct(new(unified.Options), "*"),
unified.ProvideUnifiedStorageClient,
sql.ProvideStorageBackend,
unified.ProvideUnifiedStorageClient,
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
wire.Bind(new(resource.MigratorClient), new(resource.ResourceClient)),
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
aggregatorrunner.ProvideNoopAggregatorConfigurator,
apisregistry.WireSetExts,
@@ -195,6 +201,16 @@ var wireExtsModuleServerSet = wire.NewSet(
tracing.ProvideTracingConfig,
tracing.ProvideService,
wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)),
otelTracer,
// Bus
bus.ProvideBus,
wire.Bind(new(bus.Bus), new(*bus.InProcBus)),
// Database migrations
migrations.ProvideOSSMigrations,
wire.Bind(new(registry.DatabaseMigrator), new(*migrations.OSSMigrations)),
// Database
sqlstore.ProvideService,
wire.Bind(new(db.DB), new(*sqlstore.SQLStore)),
// Unified storage
resource.ProvideStorageMetrics,
resource.ProvideIndexMetrics,

View File

@@ -26,7 +26,7 @@ var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions)
type RESTOptionsGetter struct {
client resource.ResourceClient
client resource.StorageClient
secrets secret.InlineSecureValueSupport
original storagebackend.Config
configProvider RestConfigProvider
@@ -36,7 +36,7 @@ type RESTOptionsGetter struct {
}
func NewRESTOptionsGetterForClient(
client resource.ResourceClient,
client resource.StorageClient,
secrets secret.InlineSecureValueSupport,
original storagebackend.Config,
configProvider RestConfigProvider,
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
secrets,
originalStorageConfig,
nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
nil, // secrets
originalStorageConfig,
nil,

View File

@@ -88,7 +88,7 @@ type Storage struct {
trigger storage.IndexerFuncs
indexers *cache.Indexers
store resource.ResourceClient
store resource.StorageClient
getKey func(string) (*resourcepb.ResourceKey, error)
snowflake *snowflake.Node // used to enforce internal ids
configProvider RestConfigProvider // used for provisioning
@@ -112,7 +112,7 @@ type RestConfigProvider interface {
// NewStorage instantiates a new Storage.
func NewStorage(
config *storagebackend.ConfigForResource,
store resource.ResourceClient,
store resource.StorageClient,
keyFunc func(obj runtime.Object) (string, error),
keyParser func(key string) (*resourcepb.ResourceKey, error),
newFunc func() runtime.Object,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage(

View File

@@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -45,6 +46,7 @@ type Options struct {
Authzc types.AccessClient
Docs resource.DocumentBuilderSupplier
SecureValues secrets.InlineSecureValueSupport
Backend resource.StorageBackend // Shared backend to avoid duplicate metrics registration
}
type clientMetrics struct {
@@ -66,7 +68,7 @@ func ProvideUnifiedStorageClient(opts *Options,
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues, opts.Backend)
if err == nil {
// Decide whether to disable SQL fallback stats per resource in Mode 5.
// Otherwise we would still try to query the legacy SQL database in Mode 5.
@@ -102,6 +104,7 @@ func newClient(opts options.StorageOptions,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
secure secrets.InlineSecureValueSupport,
backend resource.StorageBackend,
) (resource.ResourceClient, error) {
ctx := context.Background()
@@ -135,7 +138,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
@@ -173,15 +176,14 @@ func newClient(opts options.StorageOptions,
return nil, err
}
serverOptions := sql.ServerOptions{
storageOptions := sql.StorageServerOptions{
Backend: backend,
DB: db,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
SearchOptions: searchOptions,
StorageMetrics: storageMetrics,
IndexMetrics: indexMetrics,
Features: features,
SecureValues: secure,
}
@@ -208,7 +210,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, fmt.Errorf("failed to start scheduler: %w", err)
}
serverOptions.QOSQueue = queue
storageOptions.QOSQueue = queue
}
// only enable if an overrides file path is provided
@@ -220,15 +222,33 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
serverOptions.OverridesService = overridesSvc
storageOptions.OverridesService = overridesSvc
}
server, err := sql.NewResourceServer(serverOptions)
// Create the storage server with shared backend
storageServer, err := sql.NewStorageServer(storageOptions)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
var searchServer resource.SearchServer
if cfg.EnableSearch {
searchServer, err = sql.NewSearchServer(sql.SearchServerOptions{
Backend: backend,
DB: db,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
SearchOptions: searchOptions,
IndexMetrics: indexMetrics,
})
if err != nil {
return nil, err
}
}
return resource.NewLocalResourceClient(storageServer, searchServer), nil
}
}

View File

@@ -46,6 +46,7 @@ func TestUnifiedStorageClient(t *testing.T) {
nil,
nil,
nil,
nil,
)
require.NoError(t, err)
@@ -80,6 +81,7 @@ func TestUnifiedStorageClient(t *testing.T) {
nil,
nil,
nil,
nil,
)
require.NoError(t, err)

View File

@@ -48,7 +48,7 @@ func buildCollectionSettings(opts legacy.MigrateOptions) resource.BulkSettings {
}
type resourceClientStreamProvider struct {
client resource.ResourceClient
client resource.MigratorClient
}
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) {
@@ -71,7 +71,7 @@ func (b *bulkStoreClientStreamProvider) createStream(ctx context.Context, opts l
// This can migrate Folders, Dashboards and LibraryPanels
func ProvideUnifiedMigrator(
dashboardAccess legacy.MigrationDashboardAccessor,
client resource.ResourceClient,
client resource.MigratorClient,
) UnifiedMigrator {
return newUnifiedMigrator(
dashboardAccess,

View File

@@ -31,13 +31,31 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
resourcepb.ResourceStoreClient
// SearchClient is used to interact with unified search
type SearchClient interface {
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
}
// StorageClient is used to interact with unified storage
type StorageClient interface {
resourcepb.ResourceStoreClient
resourcepb.BlobStoreClient
}
// MigratorClient is used to perform migrations to unified storage
type MigratorClient interface {
resourcepb.BulkStoreClient
GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error)
}
// ResourceClient combines all resource-related clients and should be avoided in favor of more specific interfaces when possible
//
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
StorageClient
SearchClient
MigratorClient
resourcepb.DiagnosticsClient
resourcepb.QuotasClient
}
@@ -92,16 +110,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
return newResourceClient(cc, cci)
}
func NewLocalResourceClient(server ResourceServer) ResourceClient {
func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
// scenario: local in-proc
channel := &inprocgrpc.Channel{}
indexChannel := &inprocgrpc.Channel{}
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,
@@ -117,13 +134,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
)
}
// Register search services on the index channel if searchServer is provided
if searchServer != nil {
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
} {
indexChannel.RegisterService(
grpchan.InterceptServer(
desc,
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
grpcAuth.StreamServerInterceptor(grpcAuthInt),
),
searchServer,
)
}
}
clientInt := authnlib.NewGrpcClientInterceptor(
ProvideInProcExchanger(),
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
)
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cc)
cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cci)
}
type RemoteResourceClientConfig struct {

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource
}
// This supports indexing+search regardless of implementation
var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct {
log log.Logger
storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
)
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support
if opts.Backend == nil {
@@ -598,6 +605,22 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil
}
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
// IsHealthy implements resourcepb.DiagnosticsServer
func (s *searchSupport) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return &resourcepb.HealthCheckResponse{
Status: resourcepb.HealthCheckResponse_SERVING,
}, nil
}
func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
type RingClient struct {
Client ResourceClient
Client SearchClient
grpc_health_v1.HealthClient
Conn *grpc.ClientConn
}
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil {
return nil, err
}
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r)
}
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace))
if err != nil {

View File

@@ -34,12 +34,18 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.DiagnosticsServer
}
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer
resourcepb.QuotasServer
@@ -221,9 +227,6 @@ type ResourceServerOptions struct {
// The blob configuration
Blob BlobConfig
// Search options
Search SearchOptions
// Quota service
OverridesService *OverridesService
@@ -251,16 +254,15 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// IndexMinUpdateInterval is the time to wait after a successful write operation to ensure read-after-write consistency in search.
// This config is shared with search
IndexMinUpdateInterval time.Duration
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
}
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -343,23 +345,24 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
artificialSuccessfulWriteDelay: opts.IndexMinUpdateInterval,
}
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
/*
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
}
}
}
*/
err := s.Init(ctx)
if err != nil {
@@ -377,7 +380,6 @@ type server struct {
backend StorageBackend
blob BlobSupport
secure secrets.InlineSecureValueSupport
search *searchSupport
diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient
writeHooks WriteAccessHooks
@@ -424,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx)
}
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
@@ -453,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true
@@ -1372,47 +1365,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
}
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req)
@@ -1568,14 +1520,6 @@ func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(
}
}
func (s *server) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.RebuildIndexes(ctx, req)
}
func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) {
span := trace.SpanFromContext(ctx)
span.AddEvent("checkQuota", trace.WithAttributes(

View File

@@ -11,6 +11,9 @@ import (
"time"
"github.com/go-sql-driver/mysql"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
@@ -44,10 +47,63 @@ const defaultPrunerHistoryLimit = 20
func ProvideStorageBackend(
cfg *setting.Cfg,
db infraDB.DB,
tracer trace.Tracer,
reg prometheus.Registerer,
storageMetrics *resource.StorageMetrics,
) (resource.StorageBackend, error) {
// TODO: make this the central place to provide SQL backend
// Currently it is skipped as we need to handle the cases of Diagnostics and Lifecycle
return nil, nil
// Create the resource DB
eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
if err != nil {
return nil, fmt.Errorf("failed to create resource DB: %w", err)
}
// Check if HA is enabled
isHA := isHighAvailabilityEnabled(
cfg.SectionWithEnvOverrides("database"),
cfg.SectionWithEnvOverrides("resource_api"),
)
// Create the backend
backend, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: reg,
IsHA: isHA,
storageMetrics: storageMetrics,
LastImportTimeMaxAge: cfg.MaxFileIndexAge,
})
if err != nil {
return nil, fmt.Errorf("failed to create backend: %w", err)
}
// Initialize the backend
if err := backend.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
return backend, nil
}
// isHighAvailabilityEnabled determines if high availability mode should
// be enabled based on database configuration. High availability is enabled
// by default except for SQLite databases.
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
resourceDBType := resourceAPICfg.Key("db_type").String()
if resourceDBType != "" && resourceDBType != migrator.SQLite {
return true
}
// Check in the config if HA is enabled - by default we always assume a HA setup.
isHA := dbCfg.Key("high_availability").MustBool(true)
// SQLite is not possible to run in HA, so we force it to false.
databaseType := dbCfg.Key("type").String()
if databaseType == migrator.SQLite {
isHA = false
}
return isHA
}
type Backend interface {

View File

@@ -0,0 +1,320 @@
package sql
import (
"context"
"errors"
"fmt"
"hash/fnv"
"net/http"
"github.com/gorilla/mux"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
// operation used by the search-servers to check if they own the namespace
var (
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
)
type searchService struct {
*services.BasicService
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
httpServerRouter *mux.Router
log log.Logger
reg prometheus.Registerer
docBuilders resource.DocumentBuilderSupplier
indexMetrics *resource.BleveIndexMetrics
searchRing *ring.Ring
// Ring lifecycle and sharding support
ringLifecycler *ring.BasicLifecycler
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
}
func ProvideUnifiedSearchGrpcService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
log log.Logger,
reg prometheus.Registerer,
docBuilders resource.DocumentBuilderSupplier,
indexMetrics *resource.BleveIndexMetrics,
searchRing *ring.Ring,
memberlistKVConfig kv.Config,
backend resource.StorageBackend,
httpServerRouter *mux.Router,
) (UnifiedGrpcService, error) {
var err error
tracer := otel.Tracer("unified-search-server")
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
auth := grpc.Authenticator{Tracer: tracer}
return auth.Authenticate(ctx)
})
s := &searchService{
backend: backend,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
stoppedCh: make(chan error, 1),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
docBuilders: docBuilders,
indexMetrics: indexMetrics,
searchRing: searchRing,
httpServerRouter: httpServerRouter,
subservicesWatcher: services.NewFailureWatcher(),
}
subservices := []services.Service{}
if cfg.EnableSharding {
ringStore, err := kv.NewClient(
memberlistKVConfig,
ring.GetCodec(),
kv.RegistererWithKVName(reg, resource.RingName),
log,
)
if err != nil {
return nil, fmt.Errorf("failed to create KV store client: %s", err)
}
lifecyclerCfg, err := toLifecyclerConfig(cfg, log)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
}
// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
s.ringLifecycler, err = ring.NewBasicLifecycler(
lifecyclerCfg,
resource.RingName,
resource.RingKey,
ringStore,
delegate,
log,
reg,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
}
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
subservices = append(subservices, s.ringLifecycler)
}
if len(subservices) > 0 {
s.hasSubservices = true
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
// This will be used when running as a dskit service
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
// Register HTTP endpoints if router is provided
s.RegisterHTTPEndpoints(httpServerRouter)
return s, nil
}
func (s *searchService) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
s.log.Info("Preparing for downscale. Will not keep instance in ring on shutdown.")
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(false)
case http.MethodDelete:
s.log.Info("Downscale canceled. Will keep instance in ring on shutdown.")
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
case http.MethodGet:
// used for delayed downscale use case, which we don't support. Leaving here for completion sake
s.log.Info("Received GET request for prepare-downscale. Behavior not implemented.")
default:
}
}
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
if s.searchRing == nil {
return true, nil
}
if st := s.searchRing.State(); st != services.Running {
return false, fmt.Errorf("ring is not Running: %s", st)
}
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(key.Namespace))
if err != nil {
return false, fmt.Errorf("error hashing namespace: %w", err)
}
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
if err != nil {
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
}
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
}
func (s *searchService) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
if err != nil {
return err
}
// Create search options for the search server
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil {
return err
}
// Create the search server
searchServer, err := NewSearchServer(SearchServerOptions{
Backend: s.backend,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
IndexMetrics: s.indexMetrics,
OwnsIndexFn: s.OwnsIndex,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
healthService, err := resource.ProvideHealthService(searchServer)
if err != nil {
return err
}
srv := s.handler.GetServer()
// Register search services
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
resourcepb.RegisterDiagnosticsServer(srv, searchServer)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
if err != nil {
return err
}
if s.cfg.EnableSharding {
s.log.Info("waiting until search server is JOINING in the ring")
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
defer cancel()
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
}
s.log.Info("search server is JOINING in the ring")
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
}
s.log.Info("search server is ACTIVE in the ring")
}
// start the gRPC server
go func() {
err := s.handler.Run(ctx)
if err != nil {
s.stoppedCh <- err
} else {
s.stoppedCh <- nil
}
}()
return nil
}
// GetAddress returns the address of the gRPC server.
func (s *searchService) GetAddress() string {
return s.handler.GetAddress()
}
func (s *searchService) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *searchService) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}
func (s *searchService) RegisterHTTPEndpoints(httpServerRouter *mux.Router) {
if httpServerRouter != nil && s.cfg.EnableSharding {
httpServerRouter.Path("/prepare-downscale").Methods("GET", "POST", "DELETE").Handler(http.HandlerFunc(s.PrepareDownscale))
}
}

View File

@@ -16,7 +16,6 @@ import (
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
@@ -30,8 +29,21 @@ type QOSEnqueueDequeuer interface {
Dequeue(ctx context.Context) (func(), error)
}
// ServerOptions contains the options for creating a new ResourceServer
type ServerOptions struct {
// SearchServerOptions contains the options for creating a new SearchServer
type SearchServerOptions struct {
Backend resource.StorageBackend
DB infraDB.DB
Cfg *setting.Cfg
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchOptions resource.SearchOptions
IndexMetrics *resource.BleveIndexMetrics
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
// StorageServerOptions contains the options for creating a storage-only server (without search)
type StorageServerOptions struct {
Backend resource.StorageBackend
OverridesService *resource.OverridesService
DB infraDB.DB
@@ -39,16 +51,66 @@ type ServerOptions struct {
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics
Features featuremgmt.FeatureToggles
QOSQueue QOSEnqueueDequeuer
SecureValues secrets.InlineSecureValueSupport
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
// NewSearchServer creates a new SearchServer with the given options.
// This can be used to create a standalone search server or to create a search server
// that will be passed to NewResourceServer.
//
// Important: When running in monolith mode, the backend should be provided by the caller
// to avoid duplicate metrics registration. Only in standalone microservice mode should
// this function create its own backend.
func NewSearchServer(opts SearchServerOptions) (resource.SearchServer, error) {
backend := opts.Backend
if backend == nil {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
b, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: opts.Reg,
IsHA: isHA,
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge,
})
if err != nil {
return nil, err
}
// Initialize the backend before creating search server
if err := b.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
backend = b
}
search, err := resource.NewSearchServer(opts.SearchOptions, backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, fmt.Errorf("failed to create search server: %w", err)
}
if err := search.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize search server: %w", err)
}
return search, nil
}
// NewStorageServer creates a storage-only server without search capabilities.
// Use this when you want to run storage and search as separate services.
//
// Important: When running in monolith mode, the backend should be provided by the caller
// to avoid duplicate metrics registration. Only in standalone microservice mode should
// this function create its own backend.
func NewStorageServer(opts StorageServerOptions) (resource.ResourceServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -92,7 +154,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
if opts.Backend != nil {
serverOptions.Backend = opts.Backend
// TODO: we should probably have a proper interface for diagnostics/lifecycle
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
@@ -130,7 +191,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
}
// TODO add config to decide whether to pass RvManager or not
kvBackendOpts.RvManager = rvManager
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
if err != nil {
@@ -148,7 +208,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
Reg: opts.Reg,
IsHA: isHA,
storageMetrics: opts.StorageMetrics,
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
LastImportTimeMaxAge: opts.Cfg.MaxFileIndexAge,
})
if err != nil {
return nil, err
@@ -159,33 +219,15 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
}
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
// Initialize the backend before creating server
if serverOptions.Lifecycle != nil {
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
}
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions)
}
// isHighAvailabilityEnabled determines if high availability mode should
// be enabled based on database configuration. High availability is enabled
// by default except for SQLite databases.
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
resourceDBType := resourceAPICfg.Key("db_type").String()
if resourceDBType != "" && resourceDBType != migrator.SQLite {
return true
}
// Check in the config if HA is enabled - by default we always assume a HA setup.
isHA := dbCfg.Key("high_availability").MustBool(true)
// SQLite is not possible to run in HA, so we force it to false.
databaseType := dbCfg.Key("type").String()
if databaseType == migrator.SQLite {
isHA = false
}
return isHA
}

View File

@@ -0,0 +1,232 @@
package sql
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/scheduler"
)
type storageService struct {
*services.BasicService
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
log log.Logger
reg prometheus.Registerer
storageMetrics *resource.StorageMetrics
queue QOSEnqueueDequeuer
scheduler *scheduler.Scheduler
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
}
func ProvideStorageService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
log log.Logger,
reg prometheus.Registerer,
storageMetrics *resource.StorageMetrics,
backend resource.StorageBackend,
) (UnifiedGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage-server")
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
auth := grpc.Authenticator{Tracer: tracer}
return auth.Authenticate(ctx)
})
s := &storageService{
backend: backend,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
stoppedCh: make(chan error, 1),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
storageMetrics: storageMetrics,
subservicesWatcher: services.NewFailureWatcher(),
}
subservices := []services.Service{}
// Setup QOS if enabled
if cfg.QOSEnabled {
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
queue := scheduler.NewQueue(&scheduler.QueueOptions{
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg,
})
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: log,
})
if err != nil {
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
}
s.queue = queue
s.scheduler = sched
subservices = append(subservices, s.queue, s.scheduler)
}
if len(subservices) > 0 {
s.hasSubservices = true
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
// This will be used when running as a dskit service
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
return s, nil
}
func (s *storageService) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
if err != nil {
return err
}
// Setup overrides service if enabled
var overridesSvc *resource.OverridesService
if s.cfg.OverridesFilePath != "" {
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
FilePath: s.cfg.OverridesFilePath,
ReloadPeriod: s.cfg.OverridesReloadInterval,
})
if err != nil {
return err
}
}
// Create the storage server
storageServer, err := NewStorageServer(StorageServerOptions{
Backend: s.backend,
OverridesService: overridesSvc,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
StorageMetrics: s.storageMetrics,
Features: s.features,
QOSQueue: s.queue,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
healthService, err := resource.ProvideHealthService(storageServer)
if err != nil {
return err
}
srv := s.handler.GetServer()
// Register storage services
resourcepb.RegisterResourceStoreServer(srv, storageServer)
resourcepb.RegisterBulkStoreServer(srv, storageServer)
resourcepb.RegisterBlobStoreServer(srv, storageServer)
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
resourcepb.RegisterQuotasServer(srv, storageServer)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
if err != nil {
return err
}
// start the gRPC server
go func() {
err := s.handler.Run(ctx)
if err != nil {
s.stoppedCh <- err
} else {
s.stoppedCh <- nil
}
}()
return nil
}
// GetAddress returns the address of the gRPC server.
func (s *storageService) GetAddress() string {
return s.handler.GetAddress()
}
func (s *storageService) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *storageService) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}

View File

@@ -12,6 +12,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel"
@@ -36,15 +37,17 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/util/scheduler"
)
var (
_ UnifiedStorageGrpcService = (*service)(nil)
_ UnifiedGrpcService = (*service)(nil)
_ UnifiedGrpcService = (*searchService)(nil)
_ UnifiedGrpcService = (*storageService)(nil)
)
type UnifiedStorageGrpcService interface {
type UnifiedGrpcService interface {
services.NamedService
// Return the address where this service is running
@@ -54,38 +57,40 @@ type UnifiedStorageGrpcService interface {
type service struct {
*services.BasicService
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
stopCh chan struct{}
stoppedCh chan error
authenticator func(context.Context) (context.Context, error)
tracing trace.Tracer
db infraDB.DB
log log.Logger
reg prometheus.Registerer
docBuilders resource.DocumentBuilderSupplier
storageMetrics *resource.StorageMetrics
indexMetrics *resource.BleveIndexMetrics
docBuilders resource.DocumentBuilderSupplier
searchRing *ring.Ring
// Handler for the gRPC server
handler grpcserver.Provider
// Ring lifecycle and sharding support
ringLifecycler *ring.BasicLifecycler
// QoS support
queue QOSEnqueueDequeuer
scheduler *scheduler.Scheduler
// Subservices management
hasSubservices bool
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
// ProvideUnifiedStorageGrpcService provides a combined storage and search service running on the same gRPC server.
// This is used when running Grafana as a monolith where both services share the same process.
// Each service (storage and search) maintains its own lifecycle but shares the gRPC server.
func ProvideUnifiedStorageGrpcService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
@@ -99,9 +104,9 @@ func ProvideUnifiedStorageGrpcService(
memberlistKVConfig kv.Config,
httpServerRouter *mux.Router,
backend resource.StorageBackend,
) (UnifiedStorageGrpcService, error) {
) (UnifiedGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage")
tracer := otel.Tracer("unified-storage-combined")
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
// grpcutils.NewGrpcAuthenticator should be used instead.
@@ -178,7 +183,7 @@ func ProvideUnifiedStorageGrpcService(
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg,
})
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: log,
})
@@ -187,7 +192,7 @@ func ProvideUnifiedStorageGrpcService(
}
s.queue = queue
s.scheduler = scheduler
s.scheduler = sched
subservices = append(subservices, s.queue, s.scheduler)
}
@@ -200,6 +205,7 @@ func ProvideUnifiedStorageGrpcService(
}
// This will be used when running as a dskit service
// Note: We use StorageServer as the module name for backward compatibility
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
return s, nil
@@ -220,11 +226,6 @@ func (s *service) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
}
}
var (
// operation used by the search-servers to check if they own the namespace
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
)
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
if s.searchRing == nil {
return true, nil
@@ -261,59 +262,108 @@ func (s *service) starting(ctx context.Context) error {
return err
}
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil {
return err
}
serverOptions := ServerOptions{
Backend: s.backend,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics,
Features: s.features,
QOSQueue: s.queue,
OwnsIndexFn: s.OwnsIndex,
}
// Setup overrides service if enabled
var overridesSvc *resource.OverridesService
if s.cfg.OverridesFilePath != "" {
overridesSvc, err := resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
FilePath: s.cfg.OverridesFilePath,
ReloadPeriod: s.cfg.OverridesReloadInterval,
})
if err != nil {
return err
}
serverOptions.OverridesService = overridesSvc
}
server, err := NewResourceServer(serverOptions)
// Ensure we have a backend - create one if needed
// This is critical: we create the backend ONCE and share it between search and storage servers
// to avoid duplicate metrics registration
backend := s.backend
if backend == nil {
eDB, err := dbimpl.ProvideResourceDB(s.db, s.cfg, s.tracing)
if err != nil {
return fmt.Errorf("failed to create resource DB: %w", err)
}
isHA := isHighAvailabilityEnabled(s.cfg.SectionWithEnvOverrides("database"),
s.cfg.SectionWithEnvOverrides("resource_api"))
b, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: s.reg,
IsHA: isHA,
storageMetrics: s.storageMetrics,
LastImportTimeMaxAge: s.cfg.MaxFileIndexAge,
})
if err != nil {
return fmt.Errorf("failed to create backend: %w", err)
}
// Initialize the backend
if err := b.Init(context.Background()); err != nil {
return fmt.Errorf("failed to initialize backend: %w", err)
}
backend = b
}
// Create search options for the search server
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil {
return err
}
// Create the search server - pass the shared backend
searchServer, err := NewSearchServer(SearchServerOptions{
Backend: backend, // Use the shared backend
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
IndexMetrics: s.indexMetrics,
OwnsIndexFn: s.OwnsIndex,
})
if err != nil {
return err
}
// Create the storage server - pass the shared backend
storageServer, err := NewStorageServer(StorageServerOptions{
Backend: backend, // Use the shared backend
OverridesService: overridesSvc,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
StorageMetrics: s.storageMetrics,
Features: s.features,
QOSQueue: s.queue,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
healthService, err := resource.ProvideHealthService(server)
healthService, err := resource.ProvideHealthService(storageServer)
if err != nil {
return err
}
srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server)
resourcepb.RegisterBulkStoreServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, server)
resourcepb.RegisterManagedObjectIndexServer(srv, server)
resourcepb.RegisterBlobStoreServer(srv, server)
resourcepb.RegisterDiagnosticsServer(srv, server)
resourcepb.RegisterQuotasServer(srv, server)
// Register storage services
resourcepb.RegisterResourceStoreServer(srv, storageServer)
resourcepb.RegisterBulkStoreServer(srv, storageServer)
resourcepb.RegisterBlobStoreServer(srv, storageServer)
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
resourcepb.RegisterQuotasServer(srv, storageServer)
// Register search services
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service

View File

@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
nsPrefix := "test-ns"
var server resource.ResourceServer
var searchServer resource.SearchServer
t.Run("Create initial resources in storage", func(t *testing.T) {
initialResources := []struct {
@@ -96,25 +97,34 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Create a resource server with both backends", func(t *testing.T) {
// Create a resource server with both backends
// Create search server first
var err error
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
Search: resource.SearchOptions{
Backend: searchBackend,
Resources: &resource.TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"test.grafana.app": "testresources",
},
searchOpts := resource.SearchOptions{
Backend: searchBackend,
Resources: &resource.TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"test.grafana.app": "testresources",
},
},
}
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
require.NoError(t, err)
require.NotNil(t, searchServer)
// Initialize the search server
err = searchServer.Init(ctx)
require.NoError(t, err)
// Create a resource server separately from the search server
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
})
require.NoError(t, err)
})
t.Run("Search for initial resources", func(t *testing.T) {
// Test 1: Search for initial resources
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -194,7 +204,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search for documents", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -212,7 +222,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search with tags", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -231,7 +241,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -253,7 +263,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search with specific tag", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -272,7 +282,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",

View File

@@ -134,7 +134,7 @@ func StartGrafanaEnvWithDB(t *testing.T, grafDir, cfgPath string) (string, *serv
})
// UnifiedStorageOverGRPC
var storage sql.UnifiedStorageGrpcService
var storage sql.UnifiedGrpcService
if runstore {
storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore,
env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil, nil)