mirror of
https://github.com/grafana/grafana.git
synced 2026-01-15 23:27:12 +00:00
Compare commits
10 Commits
pyroscope/
...
feature/ex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09b493ecd2 | ||
|
|
0f73ba4d40 | ||
|
|
c8da64e4eb | ||
|
|
2fab497c18 | ||
|
|
b0bb71f834 | ||
|
|
a7aa55f908 | ||
|
|
0a61846b5e | ||
|
|
785cc739ee | ||
|
|
4913baaf04 | ||
|
|
c8f1efe7c7 |
@@ -74,7 +74,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
|
||||
return err
|
||||
}
|
||||
|
||||
grpcClient, err := newUnifiedClient(cfg, sqlStore, featureToggles)
|
||||
grpcClient, err := newUnifiedMigratorClient(cfg, sqlStore, featureToggles)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -92,7 +92,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
|
||||
return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start)
|
||||
}
|
||||
|
||||
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
|
||||
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
|
||||
migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient)
|
||||
|
||||
opts.WithHistory = true // always include history in non-interactive mode
|
||||
@@ -109,7 +109,7 @@ func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions,
|
||||
return nil
|
||||
}
|
||||
|
||||
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
|
||||
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
|
||||
yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace))
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -225,7 +225,7 @@ func promptYesNo(prompt string) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func newUnifiedClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.ResourceClient, error) {
|
||||
func newUnifiedMigratorClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.MigratorClient, error) {
|
||||
return unified.ProvideUnifiedStorageClient(&unified.Options{
|
||||
Cfg: cfg,
|
||||
Features: featureToggles,
|
||||
|
||||
@@ -10,6 +10,7 @@ const (
|
||||
SearchServerRing string = "search-server-ring"
|
||||
SearchServerDistributor string = "search-server-distributor"
|
||||
StorageServer string = "storage-server"
|
||||
SearchServer string = "search-server"
|
||||
ZanzanaServer string = "zanzana-server"
|
||||
InstrumentationServer string = "instrumentation-server"
|
||||
FrontendServer string = "frontend-server"
|
||||
@@ -17,10 +18,12 @@ const (
|
||||
)
|
||||
|
||||
var dependencyMap = map[string][]string{
|
||||
MemberlistKV: {InstrumentationServer},
|
||||
SearchServerRing: {InstrumentationServer, MemberlistKV},
|
||||
GrafanaAPIServer: {InstrumentationServer},
|
||||
MemberlistKV: {InstrumentationServer},
|
||||
SearchServerRing: {InstrumentationServer, MemberlistKV},
|
||||
GrafanaAPIServer: {InstrumentationServer},
|
||||
// TODO: remove SearchServerRing once split search is done
|
||||
StorageServer: {InstrumentationServer, SearchServerRing},
|
||||
SearchServer: {InstrumentationServer, SearchServerRing},
|
||||
ZanzanaServer: {InstrumentationServer},
|
||||
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
|
||||
Core: {},
|
||||
|
||||
@@ -11,91 +11,59 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
_ resource.ResourceClient = (*directResourceClient)(nil)
|
||||
_ resource.StorageClient = (*DirectStorageClient)(nil)
|
||||
)
|
||||
|
||||
// The direct client passes requests directly to the server using the *same* context
|
||||
func NewDirectResourceClient(server resource.ResourceServer) resource.ResourceClient {
|
||||
return &directResourceClient{server}
|
||||
// NewDirectStorageClient creates a client that passes requests directly to the server using the *same* context
|
||||
func NewDirectStorageClient(server resource.ResourceServer) *DirectStorageClient {
|
||||
return &DirectStorageClient{server}
|
||||
}
|
||||
|
||||
type directResourceClient struct {
|
||||
type DirectStorageClient struct {
|
||||
server resource.ResourceServer
|
||||
}
|
||||
|
||||
// Create implements ResourceClient.
|
||||
func (d *directResourceClient) Create(ctx context.Context, in *resourcepb.CreateRequest, opts ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
|
||||
func (d *DirectStorageClient) Create(ctx context.Context, in *resourcepb.CreateRequest, _ ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
|
||||
return d.server.Create(ctx, in)
|
||||
}
|
||||
|
||||
// Delete implements ResourceClient.
|
||||
func (d *directResourceClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, opts ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
|
||||
func (d *DirectStorageClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, _ ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
|
||||
return d.server.Delete(ctx, in)
|
||||
}
|
||||
|
||||
// GetBlob implements ResourceClient.
|
||||
func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, opts ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
|
||||
func (d *DirectStorageClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, _ ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
|
||||
return d.server.GetBlob(ctx, in)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceClient.
|
||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return d.server.GetStats(ctx, in)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceClient.
|
||||
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||
func (d *DirectStorageClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, _ ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||
return d.server.IsHealthy(ctx, in)
|
||||
}
|
||||
|
||||
// List implements ResourceClient.
|
||||
func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequest, opts ...grpc.CallOption) (*resourcepb.ListResponse, error) {
|
||||
func (d *DirectStorageClient) List(ctx context.Context, in *resourcepb.ListRequest, _ ...grpc.CallOption) (*resourcepb.ListResponse, error) {
|
||||
return d.server.List(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return d.server.ListManagedObjects(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return d.server.CountManagedObjects(ctx, in)
|
||||
}
|
||||
|
||||
// PutBlob implements ResourceClient.
|
||||
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||
func (d *DirectStorageClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, _ ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||
return d.server.PutBlob(ctx, in)
|
||||
}
|
||||
|
||||
// Read implements ResourceClient.
|
||||
func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequest, opts ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
|
||||
func (d *DirectStorageClient) Read(ctx context.Context, in *resourcepb.ReadRequest, _ ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
|
||||
return d.server.Read(ctx, in)
|
||||
}
|
||||
|
||||
// Search implements ResourceClient.
|
||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return d.server.Search(ctx, in)
|
||||
}
|
||||
|
||||
// Update implements ResourceClient.
|
||||
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||
func (d *DirectStorageClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, _ ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||
return d.server.Update(ctx, in)
|
||||
}
|
||||
|
||||
// Watch implements ResourceClient.
|
||||
func (d *directResourceClient) Watch(ctx context.Context, in *resourcepb.WatchRequest, opts ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
|
||||
func (d *DirectStorageClient) Watch(_ context.Context, _ *resourcepb.WatchRequest, _ ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
|
||||
return nil, fmt.Errorf("watch not supported with direct resource client")
|
||||
}
|
||||
|
||||
// BulkProcess implements resource.ResourceClient.
|
||||
func (d *directResourceClient) BulkProcess(ctx context.Context, opts ...grpc.CallOption) (resourcepb.BulkStore_BulkProcessClient, error) {
|
||||
return nil, fmt.Errorf("BulkProcess not supported with direct resource client")
|
||||
}
|
||||
|
||||
// RebuildIndexes implements resource.ResourceClient.
|
||||
func (b *directResourceClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest, opts ...grpc.CallOption) (*resourcepb.RebuildIndexesResponse, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (b *directResourceClient) GetQuotaUsage(ctx context.Context, req *resourcepb.QuotaUsageRequest, opts ...grpc.CallOption) (*resourcepb.QuotaUsageResponse, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
@@ -40,7 +41,7 @@ func (s *DashboardStorage) NewStore(dash utils.ResourceInfo, scheme *runtime.Sch
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := legacy.NewDirectResourceClient(server) // same context
|
||||
client := legacy.NewDirectStorageClient(server) // same context
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil,
|
||||
defaultOpts.StorageConfig.Config, nil,
|
||||
)
|
||||
|
||||
@@ -37,7 +37,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
acService accesscontrol.Service,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
unified resourcepb.ResourceIndexClient,
|
||||
zanzanaClient zanzana.Client,
|
||||
) *FolderAPIBuilder {
|
||||
builder := &FolderAPIBuilder{
|
||||
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
return builder
|
||||
}
|
||||
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
return &FolderAPIBuilder{
|
||||
features: features,
|
||||
accessClient: ac,
|
||||
|
||||
@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||
|
||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||
|
||||
@@ -198,11 +198,23 @@ func (s *ModuleServer) Run() error {
|
||||
//}
|
||||
|
||||
m.RegisterModule(modules.StorageServer, func() (services.Service, error) {
|
||||
// TODO: remove when we change deployment to use search server target
|
||||
if s.cfg.EnableSearch {
|
||||
docBuilders, err := InitializeDocumentBuilders(s.cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||
}
|
||||
return sql.ProvideStorageService(s.cfg, s.features, nil, s.log, s.registerer, s.storageMetrics, s.storageBackend)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
|
||||
docBuilders, err := InitializeDocumentBuilders(s.cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||
return sql.ProvideUnifiedSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend, s.httpServerRouter)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||
|
||||
@@ -372,7 +372,7 @@ func initModuleServerForTest(
|
||||
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
|
||||
}
|
||||
|
||||
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer {
|
||||
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.SearchServer {
|
||||
cfg := setting.NewCfg()
|
||||
section, err := cfg.Raw.NewSection("database")
|
||||
require.NoError(t, err)
|
||||
@@ -391,17 +391,16 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
|
||||
require.NoError(t, err)
|
||||
searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil)
|
||||
require.NoError(t, err)
|
||||
server, err := sql.NewResourceServer(sql.ServerOptions{
|
||||
DB: nil,
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
Reg: nil,
|
||||
AccessClient: nil,
|
||||
SearchOptions: searchOpts,
|
||||
StorageMetrics: nil,
|
||||
IndexMetrics: nil,
|
||||
Features: features,
|
||||
QOSQueue: nil,
|
||||
searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
SearchOptions: searchOpts,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
Features: features,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -417,12 +416,12 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
|
||||
|
||||
for _, ns := range testNamespaces {
|
||||
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
|
||||
_, err = server.Create(ctx, generatePlaylistPayload(ns))
|
||||
_, err = storageServer.Create(ctx, generatePlaylistPayload(ns))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
return server
|
||||
return searchServer
|
||||
}
|
||||
|
||||
var counter int
|
||||
|
||||
23
pkg/server/wire_gen.go
generated
23
pkg/server/wire_gen.go
generated
@@ -513,6 +513,11 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
options := &unified.Options{
|
||||
Cfg: cfg,
|
||||
Features: featureToggles,
|
||||
@@ -522,8 +527,8 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
||||
Authzc: accessClient,
|
||||
Docs: documentBuilderSupplier,
|
||||
SecureValues: inlineSecureValueSupport,
|
||||
Backend: storageBackend,
|
||||
}
|
||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
||||
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
||||
if err != nil {
|
||||
@@ -1173,6 +1178,11 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
options := &unified.Options{
|
||||
Cfg: cfg,
|
||||
Features: featureToggles,
|
||||
@@ -1182,8 +1192,8 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
||||
Authzc: accessClient,
|
||||
Docs: documentBuilderSupplier,
|
||||
SecureValues: inlineSecureValueSupport,
|
||||
Backend: storageBackend,
|
||||
}
|
||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
||||
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
||||
if err != nil {
|
||||
@@ -1748,7 +1758,14 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp
|
||||
hooksService := hooks.ProvideService()
|
||||
ossLicensingService := licensing.ProvideService(cfg, hooksService)
|
||||
moduleRegisterer := ProvideNoopModuleRegisterer()
|
||||
storageBackend, err := sql.ProvideStorageBackend(cfg)
|
||||
ossMigrations := migrations.ProvideOSSMigrations(featureToggles)
|
||||
inProcBus := bus.ProvideBus(tracingService)
|
||||
sqlStore, err := sqlstore.ProvideService(cfg, featureToggles, ossMigrations, inProcBus, tracingService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tracer := otelTracer()
|
||||
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@ package server
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
|
||||
"github.com/grafana/grafana/pkg/configprovider"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
@@ -65,6 +68,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||
@@ -145,8 +149,10 @@ var wireExtsBasicSet = wire.NewSet(
|
||||
sandbox.ProvideService,
|
||||
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
||||
wire.Struct(new(unified.Options), "*"),
|
||||
unified.ProvideUnifiedStorageClient,
|
||||
sql.ProvideStorageBackend,
|
||||
unified.ProvideUnifiedStorageClient,
|
||||
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
|
||||
wire.Bind(new(resource.MigratorClient), new(resource.ResourceClient)),
|
||||
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
||||
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
||||
apisregistry.WireSetExts,
|
||||
@@ -195,6 +201,16 @@ var wireExtsModuleServerSet = wire.NewSet(
|
||||
tracing.ProvideTracingConfig,
|
||||
tracing.ProvideService,
|
||||
wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)),
|
||||
otelTracer,
|
||||
// Bus
|
||||
bus.ProvideBus,
|
||||
wire.Bind(new(bus.Bus), new(*bus.InProcBus)),
|
||||
// Database migrations
|
||||
migrations.ProvideOSSMigrations,
|
||||
wire.Bind(new(registry.DatabaseMigrator), new(*migrations.OSSMigrations)),
|
||||
// Database
|
||||
sqlstore.ProvideService,
|
||||
wire.Bind(new(db.DB), new(*sqlstore.SQLStore)),
|
||||
// Unified storage
|
||||
resource.ProvideStorageMetrics,
|
||||
resource.ProvideIndexMetrics,
|
||||
|
||||
@@ -26,7 +26,7 @@ var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
|
||||
type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions)
|
||||
|
||||
type RESTOptionsGetter struct {
|
||||
client resource.ResourceClient
|
||||
client resource.StorageClient
|
||||
secrets secret.InlineSecureValueSupport
|
||||
original storagebackend.Config
|
||||
configProvider RestConfigProvider
|
||||
@@ -36,7 +36,7 @@ type RESTOptionsGetter struct {
|
||||
}
|
||||
|
||||
func NewRESTOptionsGetterForClient(
|
||||
client resource.ResourceClient,
|
||||
client resource.StorageClient,
|
||||
secrets secret.InlineSecureValueSupport,
|
||||
original storagebackend.Config,
|
||||
configProvider RestConfigProvider,
|
||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
secrets,
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
nil, // secrets
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
|
||||
@@ -88,7 +88,7 @@ type Storage struct {
|
||||
trigger storage.IndexerFuncs
|
||||
indexers *cache.Indexers
|
||||
|
||||
store resource.ResourceClient
|
||||
store resource.StorageClient
|
||||
getKey func(string) (*resourcepb.ResourceKey, error)
|
||||
snowflake *snowflake.Node // used to enforce internal ids
|
||||
configProvider RestConfigProvider // used for provisioning
|
||||
@@ -112,7 +112,7 @@ type RestConfigProvider interface {
|
||||
// NewStorage instantiates a new Storage.
|
||||
func NewStorage(
|
||||
config *storagebackend.ConfigForResource,
|
||||
store resource.ResourceClient,
|
||||
store resource.StorageClient,
|
||||
keyFunc func(obj runtime.Object) (string, error),
|
||||
keyParser func(key string) (*resourcepb.ResourceKey, error),
|
||||
newFunc func() runtime.Object,
|
||||
|
||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
default:
|
||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||
}
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
|
||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||
store, destroyFunc, err := apistore.NewStorage(
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -45,6 +46,7 @@ type Options struct {
|
||||
Authzc types.AccessClient
|
||||
Docs resource.DocumentBuilderSupplier
|
||||
SecureValues secrets.InlineSecureValueSupport
|
||||
Backend resource.StorageBackend // Shared backend to avoid duplicate metrics registration
|
||||
}
|
||||
|
||||
type clientMetrics struct {
|
||||
@@ -66,7 +68,7 @@ func ProvideUnifiedStorageClient(opts *Options,
|
||||
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
|
||||
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
||||
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
||||
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
|
||||
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues, opts.Backend)
|
||||
if err == nil {
|
||||
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
||||
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
||||
@@ -102,6 +104,7 @@ func newClient(opts options.StorageOptions,
|
||||
storageMetrics *resource.StorageMetrics,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
secure secrets.InlineSecureValueSupport,
|
||||
backend resource.StorageBackend,
|
||||
) (resource.ResourceClient, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -135,7 +138,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, nil), nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
@@ -173,15 +176,14 @@ func newClient(opts options.StorageOptions,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serverOptions := sql.ServerOptions{
|
||||
storageOptions := sql.StorageServerOptions{
|
||||
Backend: backend,
|
||||
DB: db,
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
Reg: reg,
|
||||
AccessClient: authzc,
|
||||
SearchOptions: searchOptions,
|
||||
StorageMetrics: storageMetrics,
|
||||
IndexMetrics: indexMetrics,
|
||||
Features: features,
|
||||
SecureValues: secure,
|
||||
}
|
||||
@@ -208,7 +210,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start scheduler: %w", err)
|
||||
}
|
||||
serverOptions.QOSQueue = queue
|
||||
storageOptions.QOSQueue = queue
|
||||
}
|
||||
|
||||
// only enable if an overrides file path is provided
|
||||
@@ -220,15 +222,33 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
storageOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
// Create the storage server with shared backend
|
||||
storageServer, err := sql.NewStorageServer(storageOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
|
||||
var searchServer resource.SearchServer
|
||||
if cfg.EnableSearch {
|
||||
searchServer, err = sql.NewSearchServer(sql.SearchServerOptions{
|
||||
Backend: backend,
|
||||
DB: db,
|
||||
Cfg: cfg,
|
||||
Tracer: tracer,
|
||||
Reg: reg,
|
||||
AccessClient: authzc,
|
||||
SearchOptions: searchOptions,
|
||||
IndexMetrics: indexMetrics,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return resource.NewLocalResourceClient(storageServer, searchServer), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -80,6 +81,7 @@ func TestUnifiedStorageClient(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ func buildCollectionSettings(opts legacy.MigrateOptions) resource.BulkSettings {
|
||||
}
|
||||
|
||||
type resourceClientStreamProvider struct {
|
||||
client resource.ResourceClient
|
||||
client resource.MigratorClient
|
||||
}
|
||||
|
||||
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) {
|
||||
@@ -71,7 +71,7 @@ func (b *bulkStoreClientStreamProvider) createStream(ctx context.Context, opts l
|
||||
// This can migrate Folders, Dashboards and LibraryPanels
|
||||
func ProvideUnifiedMigrator(
|
||||
dashboardAccess legacy.MigrationDashboardAccessor,
|
||||
client resource.ResourceClient,
|
||||
client resource.MigratorClient,
|
||||
) UnifiedMigrator {
|
||||
return newUnifiedMigrator(
|
||||
dashboardAccess,
|
||||
|
||||
@@ -31,13 +31,31 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
// SearchClient is used to interact with unified search
|
||||
type SearchClient interface {
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
resourcepb.BulkStoreClient
|
||||
}
|
||||
|
||||
// StorageClient is used to interact with unified storage
|
||||
type StorageClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
resourcepb.BlobStoreClient
|
||||
}
|
||||
|
||||
// MigratorClient is used to perform migrations to unified storage
|
||||
type MigratorClient interface {
|
||||
resourcepb.BulkStoreClient
|
||||
GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error)
|
||||
}
|
||||
|
||||
// ResourceClient combines all resource-related clients and should be avoided in favor of more specific interfaces when possible
|
||||
//
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
StorageClient
|
||||
SearchClient
|
||||
MigratorClient
|
||||
resourcepb.DiagnosticsClient
|
||||
resourcepb.QuotasClient
|
||||
}
|
||||
@@ -92,16 +110,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
|
||||
return newResourceClient(cc, cci)
|
||||
}
|
||||
|
||||
func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
|
||||
// scenario: local in-proc
|
||||
channel := &inprocgrpc.Channel{}
|
||||
indexChannel := &inprocgrpc.Channel{}
|
||||
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceStore_ServiceDesc,
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
&resourcepb.BlobStore_ServiceDesc,
|
||||
&resourcepb.BulkStore_ServiceDesc,
|
||||
&resourcepb.Diagnostics_ServiceDesc,
|
||||
@@ -117,13 +134,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
)
|
||||
}
|
||||
|
||||
// Register search services on the index channel if searchServer is provided
|
||||
if searchServer != nil {
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
} {
|
||||
indexChannel.RegisterService(
|
||||
grpchan.InterceptServer(
|
||||
desc,
|
||||
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
|
||||
grpcAuth.StreamServerInterceptor(grpcAuthInt),
|
||||
),
|
||||
searchServer,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
clientInt := authnlib.NewGrpcClientInterceptor(
|
||||
ProvideInProcExchanger(),
|
||||
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
||||
)
|
||||
|
||||
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||
return newResourceClient(cc, cc)
|
||||
cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||
return newResourceClient(cc, cci)
|
||||
}
|
||||
|
||||
type RemoteResourceClientConfig struct {
|
||||
|
||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
||||
GetOpenIndexes() []NamespacedResource
|
||||
}
|
||||
|
||||
// This supports indexing+search regardless of implementation
|
||||
var _ SearchServer = &searchSupport{}
|
||||
|
||||
// This supports indexing+search regardless of implementation.
|
||||
// Implements SearchServer interface.
|
||||
type searchSupport struct {
|
||||
log log.Logger
|
||||
storage StorageBackend
|
||||
@@ -160,6 +163,10 @@ var (
|
||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||
)
|
||||
|
||||
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||
}
|
||||
|
||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||
// No backend search support
|
||||
if opts.Backend == nil {
|
||||
@@ -598,6 +605,22 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
||||
return totalBatchesIndexed, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) Init(ctx context.Context) error {
|
||||
return s.init(ctx)
|
||||
}
|
||||
|
||||
func (s *searchSupport) Stop(_ context.Context) error {
|
||||
s.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsHealthy implements resourcepb.DiagnosticsServer
|
||||
func (s *searchSupport) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return &resourcepb.HealthCheckResponse{
|
||||
Status: resourcepb.HealthCheckResponse_SERVING,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) init(ctx context.Context) error {
|
||||
origCtx := ctx
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
type RingClient struct {
|
||||
Client ResourceClient
|
||||
Client SearchClient
|
||||
grpc_health_v1.HealthClient
|
||||
Conn *grpc.ClientConn
|
||||
}
|
||||
@@ -99,7 +99,7 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
||||
return client.ListManagedObjects(ctx, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(namespace))
|
||||
if err != nil {
|
||||
|
||||
@@ -34,12 +34,18 @@ import (
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
type SearchServer interface {
|
||||
LifecycleHooks
|
||||
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.DiagnosticsServer
|
||||
}
|
||||
|
||||
// ResourceServer implements all gRPC services
|
||||
type ResourceServer interface {
|
||||
resourcepb.ResourceStoreServer
|
||||
resourcepb.BulkStoreServer
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.BlobStoreServer
|
||||
resourcepb.DiagnosticsServer
|
||||
resourcepb.QuotasServer
|
||||
@@ -221,9 +227,6 @@ type ResourceServerOptions struct {
|
||||
// The blob configuration
|
||||
Blob BlobConfig
|
||||
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
|
||||
@@ -251,16 +254,15 @@ type ResourceServerOptions struct {
|
||||
|
||||
storageMetrics *StorageMetrics
|
||||
|
||||
IndexMetrics *BleveIndexMetrics
|
||||
|
||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||
MaxPageSizeBytes int
|
||||
// IndexMinUpdateInterval is the time to wait after a successful write operation to ensure read-after-write consistency in search.
|
||||
// This config is shared with search
|
||||
IndexMinUpdateInterval time.Duration
|
||||
|
||||
// QOSQueue is the quality of service queue used to enqueue
|
||||
QOSQueue QOSEnqueuer
|
||||
QOSConfig QueueConfig
|
||||
|
||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
@@ -343,23 +345,24 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
storageMetrics: opts.storageMetrics,
|
||||
indexMetrics: opts.IndexMetrics,
|
||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||
reg: opts.Reg,
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
artificialSuccessfulWriteDelay: opts.IndexMinUpdateInterval,
|
||||
}
|
||||
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
/*
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
err := s.Init(ctx)
|
||||
if err != nil {
|
||||
@@ -377,7 +380,6 @@ type server struct {
|
||||
backend StorageBackend
|
||||
blob BlobSupport
|
||||
secure secrets.InlineSecureValueSupport
|
||||
search *searchSupport
|
||||
diagnostics resourcepb.DiagnosticsServer
|
||||
access claims.AccessClient
|
||||
writeHooks WriteAccessHooks
|
||||
@@ -424,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
|
||||
s.initErr = s.overridesService.init(ctx)
|
||||
}
|
||||
|
||||
// initialize the search index
|
||||
if s.initErr == nil && s.search != nil {
|
||||
s.initErr = s.search.init(ctx)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil {
|
||||
s.initErr = s.initWatcher()
|
||||
@@ -453,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.search != nil {
|
||||
s.search.stop()
|
||||
}
|
||||
|
||||
if s.overridesService != nil {
|
||||
if err := s.overridesService.stop(ctx); err != nil {
|
||||
stopFailed = true
|
||||
@@ -1372,47 +1365,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceServer.
|
||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
if err := s.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.search == nil {
|
||||
// If the backend implements "GetStats", we can use it
|
||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||||
if ok {
|
||||
return srv.GetStats(ctx, req)
|
||||
}
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
return s.search.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.ListManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return s.diagnostics.IsHealthy(ctx, req)
|
||||
@@ -1568,14 +1520,6 @@ func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.RebuildIndexes(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
span.AddEvent("checkQuota", trace.WithAttributes(
|
||||
|
||||
@@ -11,6 +11,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/lib/pq"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -44,10 +47,63 @@ const defaultPrunerHistoryLimit = 20
|
||||
|
||||
func ProvideStorageBackend(
|
||||
cfg *setting.Cfg,
|
||||
db infraDB.DB,
|
||||
tracer trace.Tracer,
|
||||
reg prometheus.Registerer,
|
||||
storageMetrics *resource.StorageMetrics,
|
||||
) (resource.StorageBackend, error) {
|
||||
// TODO: make this the central place to provide SQL backend
|
||||
// Currently it is skipped as we need to handle the cases of Diagnostics and Lifecycle
|
||||
return nil, nil
|
||||
// Create the resource DB
|
||||
eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create resource DB: %w", err)
|
||||
}
|
||||
|
||||
// Check if HA is enabled
|
||||
isHA := isHighAvailabilityEnabled(
|
||||
cfg.SectionWithEnvOverrides("database"),
|
||||
cfg.SectionWithEnvOverrides("resource_api"),
|
||||
)
|
||||
|
||||
// Create the backend
|
||||
backend, err := NewBackend(BackendOptions{
|
||||
DBProvider: eDB,
|
||||
Reg: reg,
|
||||
IsHA: isHA,
|
||||
storageMetrics: storageMetrics,
|
||||
LastImportTimeMaxAge: cfg.MaxFileIndexAge,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create backend: %w", err)
|
||||
}
|
||||
|
||||
// Initialize the backend
|
||||
if err := backend.Init(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||
}
|
||||
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
// be enabled based on database configuration. High availability is enabled
|
||||
// by default except for SQLite databases.
|
||||
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
|
||||
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
|
||||
resourceDBType := resourceAPICfg.Key("db_type").String()
|
||||
if resourceDBType != "" && resourceDBType != migrator.SQLite {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check in the config if HA is enabled - by default we always assume a HA setup.
|
||||
isHA := dbCfg.Key("high_availability").MustBool(true)
|
||||
|
||||
// SQLite is not possible to run in HA, so we force it to false.
|
||||
databaseType := dbCfg.Key("type").String()
|
||||
if databaseType == migrator.SQLite {
|
||||
isHA = false
|
||||
}
|
||||
|
||||
return isHA
|
||||
}
|
||||
|
||||
type Backend interface {
|
||||
|
||||
320
pkg/storage/unified/sql/search_service.go
Normal file
320
pkg/storage/unified/sql/search_service.go
Normal file
@@ -0,0 +1,320 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
|
||||
"github.com/grafana/dskit/kv"
|
||||
"github.com/grafana/dskit/ring"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/services/authz"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
// operation used by the search-servers to check if they own the namespace
|
||||
var (
|
||||
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
|
||||
)
|
||||
|
||||
type searchService struct {
|
||||
*services.BasicService
|
||||
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
handler grpcserver.Provider
|
||||
tracing trace.Tracer
|
||||
authenticator func(ctx context.Context) (context.Context, error)
|
||||
httpServerRouter *mux.Router
|
||||
|
||||
log log.Logger
|
||||
reg prometheus.Registerer
|
||||
docBuilders resource.DocumentBuilderSupplier
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
searchRing *ring.Ring
|
||||
|
||||
// Ring lifecycle and sharding support
|
||||
ringLifecycler *ring.BasicLifecycler
|
||||
|
||||
// Subservices manager
|
||||
subservices *services.Manager
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
}
|
||||
|
||||
func ProvideUnifiedSearchGrpcService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
db infraDB.DB,
|
||||
log log.Logger,
|
||||
reg prometheus.Registerer,
|
||||
docBuilders resource.DocumentBuilderSupplier,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
searchRing *ring.Ring,
|
||||
memberlistKVConfig kv.Config,
|
||||
backend resource.StorageBackend,
|
||||
httpServerRouter *mux.Router,
|
||||
) (UnifiedGrpcService, error) {
|
||||
var err error
|
||||
tracer := otel.Tracer("unified-search-server")
|
||||
|
||||
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
||||
auth := grpc.Authenticator{Tracer: tracer}
|
||||
return auth.Authenticate(ctx)
|
||||
})
|
||||
|
||||
s := &searchService{
|
||||
backend: backend,
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan error, 1),
|
||||
authenticator: authn,
|
||||
tracing: tracer,
|
||||
db: db,
|
||||
log: log,
|
||||
reg: reg,
|
||||
docBuilders: docBuilders,
|
||||
indexMetrics: indexMetrics,
|
||||
searchRing: searchRing,
|
||||
httpServerRouter: httpServerRouter,
|
||||
subservicesWatcher: services.NewFailureWatcher(),
|
||||
}
|
||||
|
||||
subservices := []services.Service{}
|
||||
if cfg.EnableSharding {
|
||||
ringStore, err := kv.NewClient(
|
||||
memberlistKVConfig,
|
||||
ring.GetCodec(),
|
||||
kv.RegistererWithKVName(reg, resource.RingName),
|
||||
log,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create KV store client: %s", err)
|
||||
}
|
||||
|
||||
lifecyclerCfg, err := toLifecyclerConfig(cfg, log)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
|
||||
}
|
||||
|
||||
// Define lifecycler delegates in reverse order (last to be called defined first because they're
|
||||
// chained via "next delegate").
|
||||
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
|
||||
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
|
||||
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
|
||||
|
||||
s.ringLifecycler, err = ring.NewBasicLifecycler(
|
||||
lifecyclerCfg,
|
||||
resource.RingName,
|
||||
resource.RingKey,
|
||||
ringStore,
|
||||
delegate,
|
||||
log,
|
||||
reg,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
|
||||
}
|
||||
|
||||
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
||||
subservices = append(subservices, s.ringLifecycler)
|
||||
}
|
||||
|
||||
if len(subservices) > 0 {
|
||||
s.hasSubservices = true
|
||||
s.subservices, err = services.NewManager(subservices...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// This will be used when running as a dskit service
|
||||
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
|
||||
|
||||
// Register HTTP endpoints if router is provided
|
||||
s.RegisterHTTPEndpoints(httpServerRouter)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *searchService) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
s.log.Info("Preparing for downscale. Will not keep instance in ring on shutdown.")
|
||||
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(false)
|
||||
case http.MethodDelete:
|
||||
s.log.Info("Downscale canceled. Will keep instance in ring on shutdown.")
|
||||
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
||||
case http.MethodGet:
|
||||
// used for delayed downscale use case, which we don't support. Leaving here for completion sake
|
||||
s.log.Info("Received GET request for prepare-downscale. Behavior not implemented.")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
if s.searchRing == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if st := s.searchRing.State(); st != services.Running {
|
||||
return false, fmt.Errorf("ring is not Running: %s", st)
|
||||
}
|
||||
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||
}
|
||||
|
||||
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
|
||||
}
|
||||
|
||||
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
|
||||
}
|
||||
|
||||
func (s *searchService) starting(ctx context.Context) error {
|
||||
if s.hasSubservices {
|
||||
s.subservicesWatcher.WatchManager(s.subservices)
|
||||
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
||||
return fmt.Errorf("failed to start subservices: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create search options for the search server
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the search server
|
||||
searchServer, err := NewSearchServer(SearchServerOptions{
|
||||
Backend: s.backend,
|
||||
DB: s.db,
|
||||
Cfg: s.cfg,
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
SearchOptions: searchOptions,
|
||||
IndexMetrics: s.indexMetrics,
|
||||
OwnsIndexFn: s.OwnsIndex,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
healthService, err := resource.ProvideHealthService(searchServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := s.handler.GetServer()
|
||||
// Register search services
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, searchServer)
|
||||
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||
|
||||
// register reflection service
|
||||
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.cfg.EnableSharding {
|
||||
s.log.Info("waiting until search server is JOINING in the ring")
|
||||
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
|
||||
defer cancel()
|
||||
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
|
||||
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
|
||||
}
|
||||
s.log.Info("search server is JOINING in the ring")
|
||||
|
||||
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
|
||||
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
|
||||
}
|
||||
s.log.Info("search server is ACTIVE in the ring")
|
||||
}
|
||||
|
||||
// start the gRPC server
|
||||
go func() {
|
||||
err := s.handler.Run(ctx)
|
||||
if err != nil {
|
||||
s.stoppedCh <- err
|
||||
} else {
|
||||
s.stoppedCh <- nil
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAddress returns the address of the gRPC server.
|
||||
func (s *searchService) GetAddress() string {
|
||||
return s.handler.GetAddress()
|
||||
}
|
||||
|
||||
func (s *searchService) running(ctx context.Context) error {
|
||||
select {
|
||||
case err := <-s.stoppedCh:
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
case err := <-s.subservicesWatcher.Chan():
|
||||
return fmt.Errorf("subservice failure: %w", err)
|
||||
case <-ctx.Done():
|
||||
close(s.stopCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchService) stopping(_ error) error {
|
||||
if s.hasSubservices {
|
||||
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stop subservices: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchService) RegisterHTTPEndpoints(httpServerRouter *mux.Router) {
|
||||
if httpServerRouter != nil && s.cfg.EnableSharding {
|
||||
httpServerRouter.Path("/prepare-downscale").Methods("GET", "POST", "DELETE").Handler(http.HandlerFunc(s.PrepareDownscale))
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
||||
inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
@@ -30,8 +29,21 @@ type QOSEnqueueDequeuer interface {
|
||||
Dequeue(ctx context.Context) (func(), error)
|
||||
}
|
||||
|
||||
// ServerOptions contains the options for creating a new ResourceServer
|
||||
type ServerOptions struct {
|
||||
// SearchServerOptions contains the options for creating a new SearchServer
|
||||
type SearchServerOptions struct {
|
||||
Backend resource.StorageBackend
|
||||
DB infraDB.DB
|
||||
Cfg *setting.Cfg
|
||||
Tracer trace.Tracer
|
||||
Reg prometheus.Registerer
|
||||
AccessClient types.AccessClient
|
||||
SearchOptions resource.SearchOptions
|
||||
IndexMetrics *resource.BleveIndexMetrics
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
// StorageServerOptions contains the options for creating a storage-only server (without search)
|
||||
type StorageServerOptions struct {
|
||||
Backend resource.StorageBackend
|
||||
OverridesService *resource.OverridesService
|
||||
DB infraDB.DB
|
||||
@@ -39,16 +51,66 @@ type ServerOptions struct {
|
||||
Tracer trace.Tracer
|
||||
Reg prometheus.Registerer
|
||||
AccessClient types.AccessClient
|
||||
SearchOptions resource.SearchOptions
|
||||
StorageMetrics *resource.StorageMetrics
|
||||
IndexMetrics *resource.BleveIndexMetrics
|
||||
Features featuremgmt.FeatureToggles
|
||||
QOSQueue QOSEnqueueDequeuer
|
||||
SecureValues secrets.InlineSecureValueSupport
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
// NewSearchServer creates a new SearchServer with the given options.
|
||||
// This can be used to create a standalone search server or to create a search server
|
||||
// that will be passed to NewResourceServer.
|
||||
//
|
||||
// Important: When running in monolith mode, the backend should be provided by the caller
|
||||
// to avoid duplicate metrics registration. Only in standalone microservice mode should
|
||||
// this function create its own backend.
|
||||
func NewSearchServer(opts SearchServerOptions) (resource.SearchServer, error) {
|
||||
backend := opts.Backend
|
||||
if backend == nil {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||
opts.Cfg.SectionWithEnvOverrides("resource_api"))
|
||||
|
||||
b, err := NewBackend(BackendOptions{
|
||||
DBProvider: eDB,
|
||||
Reg: opts.Reg,
|
||||
IsHA: isHA,
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the backend before creating search server
|
||||
if err := b.Init(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||
}
|
||||
backend = b
|
||||
}
|
||||
|
||||
search, err := resource.NewSearchServer(opts.SearchOptions, backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create search server: %w", err)
|
||||
}
|
||||
|
||||
if err := search.Init(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize search server: %w", err)
|
||||
}
|
||||
|
||||
return search, nil
|
||||
}
|
||||
|
||||
// NewStorageServer creates a storage-only server without search capabilities.
|
||||
// Use this when you want to run storage and search as separate services.
|
||||
//
|
||||
// Important: When running in monolith mode, the backend should be provided by the caller
|
||||
// to avoid duplicate metrics registration. Only in standalone microservice mode should
|
||||
// this function create its own backend.
|
||||
func NewStorageServer(opts StorageServerOptions) (resource.ResourceServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -92,7 +154,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
|
||||
if opts.Backend != nil {
|
||||
serverOptions.Backend = opts.Backend
|
||||
// TODO: we should probably have a proper interface for diagnostics/lifecycle
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
@@ -130,7 +191,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
|
||||
}
|
||||
|
||||
// TODO add config to decide whether to pass RvManager or not
|
||||
kvBackendOpts.RvManager = rvManager
|
||||
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
|
||||
if err != nil {
|
||||
@@ -148,7 +208,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
Reg: opts.Reg,
|
||||
IsHA: isHA,
|
||||
storageMetrics: opts.StorageMetrics,
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||
LastImportTimeMaxAge: opts.Cfg.MaxFileIndexAge,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -159,33 +219,15 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
}
|
||||
}
|
||||
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
// Initialize the backend before creating server
|
||||
if serverOptions.Lifecycle != nil {
|
||||
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
serverOptions.OverridesService = opts.OverridesService
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
// be enabled based on database configuration. High availability is enabled
|
||||
// by default except for SQLite databases.
|
||||
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
|
||||
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
|
||||
resourceDBType := resourceAPICfg.Key("db_type").String()
|
||||
if resourceDBType != "" && resourceDBType != migrator.SQLite {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check in the config if HA is enabled - by default we always assume a HA setup.
|
||||
isHA := dbCfg.Key("high_availability").MustBool(true)
|
||||
|
||||
// SQLite is not possible to run in HA, so we force it to false.
|
||||
databaseType := dbCfg.Key("type").String()
|
||||
if databaseType == migrator.SQLite {
|
||||
isHA = false
|
||||
}
|
||||
|
||||
return isHA
|
||||
}
|
||||
|
||||
232
pkg/storage/unified/sql/storage_service.go
Normal file
232
pkg/storage/unified/sql/storage_service.go
Normal file
@@ -0,0 +1,232 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/services/authz"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||
)
|
||||
|
||||
type storageService struct {
|
||||
*services.BasicService
|
||||
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
|
||||
handler grpcserver.Provider
|
||||
|
||||
tracing trace.Tracer
|
||||
|
||||
authenticator func(ctx context.Context) (context.Context, error)
|
||||
|
||||
log log.Logger
|
||||
reg prometheus.Registerer
|
||||
storageMetrics *resource.StorageMetrics
|
||||
|
||||
queue QOSEnqueueDequeuer
|
||||
scheduler *scheduler.Scheduler
|
||||
|
||||
// Subservices manager
|
||||
subservices *services.Manager
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
}
|
||||
|
||||
func ProvideStorageService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
db infraDB.DB,
|
||||
log log.Logger,
|
||||
reg prometheus.Registerer,
|
||||
storageMetrics *resource.StorageMetrics,
|
||||
backend resource.StorageBackend,
|
||||
) (UnifiedGrpcService, error) {
|
||||
var err error
|
||||
tracer := otel.Tracer("unified-storage-server")
|
||||
|
||||
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
||||
auth := grpc.Authenticator{Tracer: tracer}
|
||||
return auth.Authenticate(ctx)
|
||||
})
|
||||
|
||||
s := &storageService{
|
||||
backend: backend,
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan error, 1),
|
||||
authenticator: authn,
|
||||
tracing: tracer,
|
||||
db: db,
|
||||
log: log,
|
||||
reg: reg,
|
||||
storageMetrics: storageMetrics,
|
||||
subservicesWatcher: services.NewFailureWatcher(),
|
||||
}
|
||||
|
||||
subservices := []services.Service{}
|
||||
|
||||
// Setup QOS if enabled
|
||||
if cfg.QOSEnabled {
|
||||
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
|
||||
queue := scheduler.NewQueue(&scheduler.QueueOptions{
|
||||
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
||||
Registerer: qosReg,
|
||||
})
|
||||
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||
NumWorkers: cfg.QOSNumberWorker,
|
||||
Logger: log,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
|
||||
}
|
||||
|
||||
s.queue = queue
|
||||
s.scheduler = sched
|
||||
subservices = append(subservices, s.queue, s.scheduler)
|
||||
}
|
||||
|
||||
if len(subservices) > 0 {
|
||||
s.hasSubservices = true
|
||||
s.subservices, err = services.NewManager(subservices...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// This will be used when running as a dskit service
|
||||
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *storageService) starting(ctx context.Context) error {
|
||||
if s.hasSubservices {
|
||||
s.subservicesWatcher.WatchManager(s.subservices)
|
||||
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
||||
return fmt.Errorf("failed to start subservices: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup overrides service if enabled
|
||||
var overridesSvc *resource.OverridesService
|
||||
if s.cfg.OverridesFilePath != "" {
|
||||
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
||||
FilePath: s.cfg.OverridesFilePath,
|
||||
ReloadPeriod: s.cfg.OverridesReloadInterval,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Create the storage server
|
||||
storageServer, err := NewStorageServer(StorageServerOptions{
|
||||
Backend: s.backend,
|
||||
OverridesService: overridesSvc,
|
||||
DB: s.db,
|
||||
Cfg: s.cfg,
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
StorageMetrics: s.storageMetrics,
|
||||
Features: s.features,
|
||||
QOSQueue: s.queue,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
healthService, err := resource.ProvideHealthService(storageServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := s.handler.GetServer()
|
||||
// Register storage services
|
||||
resourcepb.RegisterResourceStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterBulkStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterBlobStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
|
||||
resourcepb.RegisterQuotasServer(srv, storageServer)
|
||||
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||
|
||||
// register reflection service
|
||||
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start the gRPC server
|
||||
go func() {
|
||||
err := s.handler.Run(ctx)
|
||||
if err != nil {
|
||||
s.stoppedCh <- err
|
||||
} else {
|
||||
s.stoppedCh <- nil
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAddress returns the address of the gRPC server.
|
||||
func (s *storageService) GetAddress() string {
|
||||
return s.handler.GetAddress()
|
||||
}
|
||||
|
||||
func (s *storageService) running(ctx context.Context) error {
|
||||
select {
|
||||
case err := <-s.stoppedCh:
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
case err := <-s.subservicesWatcher.Chan():
|
||||
return fmt.Errorf("subservice failure: %w", err)
|
||||
case <-ctx.Done():
|
||||
close(s.stopCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *storageService) stopping(_ error) error {
|
||||
if s.hasSubservices {
|
||||
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stop subservices: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.opentelemetry.io/otel"
|
||||
@@ -36,15 +37,17 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||
)
|
||||
|
||||
var (
|
||||
_ UnifiedStorageGrpcService = (*service)(nil)
|
||||
_ UnifiedGrpcService = (*service)(nil)
|
||||
_ UnifiedGrpcService = (*searchService)(nil)
|
||||
_ UnifiedGrpcService = (*storageService)(nil)
|
||||
)
|
||||
|
||||
type UnifiedStorageGrpcService interface {
|
||||
type UnifiedGrpcService interface {
|
||||
services.NamedService
|
||||
|
||||
// Return the address where this service is running
|
||||
@@ -54,38 +57,40 @@ type UnifiedStorageGrpcService interface {
|
||||
type service struct {
|
||||
*services.BasicService
|
||||
|
||||
// Subservices manager
|
||||
subservices *services.Manager
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
|
||||
handler grpcserver.Provider
|
||||
|
||||
tracing trace.Tracer
|
||||
|
||||
authenticator func(ctx context.Context) (context.Context, error)
|
||||
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
authenticator func(context.Context) (context.Context, error)
|
||||
tracing trace.Tracer
|
||||
db infraDB.DB
|
||||
log log.Logger
|
||||
reg prometheus.Registerer
|
||||
docBuilders resource.DocumentBuilderSupplier
|
||||
storageMetrics *resource.StorageMetrics
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
|
||||
docBuilders resource.DocumentBuilderSupplier
|
||||
|
||||
searchRing *ring.Ring
|
||||
|
||||
// Handler for the gRPC server
|
||||
handler grpcserver.Provider
|
||||
|
||||
// Ring lifecycle and sharding support
|
||||
ringLifecycler *ring.BasicLifecycler
|
||||
|
||||
// QoS support
|
||||
queue QOSEnqueueDequeuer
|
||||
scheduler *scheduler.Scheduler
|
||||
|
||||
// Subservices management
|
||||
hasSubservices bool
|
||||
subservices *services.Manager
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
}
|
||||
|
||||
// ProvideUnifiedStorageGrpcService provides a combined storage and search service running on the same gRPC server.
|
||||
// This is used when running Grafana as a monolith where both services share the same process.
|
||||
// Each service (storage and search) maintains its own lifecycle but shares the gRPC server.
|
||||
func ProvideUnifiedStorageGrpcService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
@@ -99,9 +104,9 @@ func ProvideUnifiedStorageGrpcService(
|
||||
memberlistKVConfig kv.Config,
|
||||
httpServerRouter *mux.Router,
|
||||
backend resource.StorageBackend,
|
||||
) (UnifiedStorageGrpcService, error) {
|
||||
) (UnifiedGrpcService, error) {
|
||||
var err error
|
||||
tracer := otel.Tracer("unified-storage")
|
||||
tracer := otel.Tracer("unified-storage-combined")
|
||||
|
||||
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
|
||||
// grpcutils.NewGrpcAuthenticator should be used instead.
|
||||
@@ -178,7 +183,7 @@ func ProvideUnifiedStorageGrpcService(
|
||||
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
||||
Registerer: qosReg,
|
||||
})
|
||||
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||
NumWorkers: cfg.QOSNumberWorker,
|
||||
Logger: log,
|
||||
})
|
||||
@@ -187,7 +192,7 @@ func ProvideUnifiedStorageGrpcService(
|
||||
}
|
||||
|
||||
s.queue = queue
|
||||
s.scheduler = scheduler
|
||||
s.scheduler = sched
|
||||
subservices = append(subservices, s.queue, s.scheduler)
|
||||
}
|
||||
|
||||
@@ -200,6 +205,7 @@ func ProvideUnifiedStorageGrpcService(
|
||||
}
|
||||
|
||||
// This will be used when running as a dskit service
|
||||
// Note: We use StorageServer as the module name for backward compatibility
|
||||
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
||||
|
||||
return s, nil
|
||||
@@ -220,11 +226,6 @@ func (s *service) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// operation used by the search-servers to check if they own the namespace
|
||||
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
|
||||
)
|
||||
|
||||
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
if s.searchRing == nil {
|
||||
return true, nil
|
||||
@@ -261,59 +262,108 @@ func (s *service) starting(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
serverOptions := ServerOptions{
|
||||
Backend: s.backend,
|
||||
DB: s.db,
|
||||
Cfg: s.cfg,
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
SearchOptions: searchOptions,
|
||||
StorageMetrics: s.storageMetrics,
|
||||
IndexMetrics: s.indexMetrics,
|
||||
Features: s.features,
|
||||
QOSQueue: s.queue,
|
||||
OwnsIndexFn: s.OwnsIndex,
|
||||
}
|
||||
|
||||
// Setup overrides service if enabled
|
||||
var overridesSvc *resource.OverridesService
|
||||
if s.cfg.OverridesFilePath != "" {
|
||||
overridesSvc, err := resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
||||
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
||||
FilePath: s.cfg.OverridesFilePath,
|
||||
ReloadPeriod: s.cfg.OverridesReloadInterval,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
// Ensure we have a backend - create one if needed
|
||||
// This is critical: we create the backend ONCE and share it between search and storage servers
|
||||
// to avoid duplicate metrics registration
|
||||
backend := s.backend
|
||||
if backend == nil {
|
||||
eDB, err := dbimpl.ProvideResourceDB(s.db, s.cfg, s.tracing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create resource DB: %w", err)
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(s.cfg.SectionWithEnvOverrides("database"),
|
||||
s.cfg.SectionWithEnvOverrides("resource_api"))
|
||||
|
||||
b, err := NewBackend(BackendOptions{
|
||||
DBProvider: eDB,
|
||||
Reg: s.reg,
|
||||
IsHA: isHA,
|
||||
storageMetrics: s.storageMetrics,
|
||||
LastImportTimeMaxAge: s.cfg.MaxFileIndexAge,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create backend: %w", err)
|
||||
}
|
||||
|
||||
// Initialize the backend
|
||||
if err := b.Init(context.Background()); err != nil {
|
||||
return fmt.Errorf("failed to initialize backend: %w", err)
|
||||
}
|
||||
backend = b
|
||||
}
|
||||
|
||||
// Create search options for the search server
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the search server - pass the shared backend
|
||||
searchServer, err := NewSearchServer(SearchServerOptions{
|
||||
Backend: backend, // Use the shared backend
|
||||
DB: s.db,
|
||||
Cfg: s.cfg,
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
SearchOptions: searchOptions,
|
||||
IndexMetrics: s.indexMetrics,
|
||||
OwnsIndexFn: s.OwnsIndex,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the storage server - pass the shared backend
|
||||
storageServer, err := NewStorageServer(StorageServerOptions{
|
||||
Backend: backend, // Use the shared backend
|
||||
OverridesService: overridesSvc,
|
||||
DB: s.db,
|
||||
Cfg: s.cfg,
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
StorageMetrics: s.storageMetrics,
|
||||
Features: s.features,
|
||||
QOSQueue: s.queue,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
healthService, err := resource.ProvideHealthService(server)
|
||||
healthService, err := resource.ProvideHealthService(storageServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
||||
resourcepb.RegisterQuotasServer(srv, server)
|
||||
// Register storage services
|
||||
resourcepb.RegisterResourceStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterBulkStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterBlobStoreServer(srv, storageServer)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
|
||||
resourcepb.RegisterQuotasServer(srv, storageServer)
|
||||
// Register search services
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||
|
||||
// register reflection service
|
||||
@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
nsPrefix := "test-ns"
|
||||
|
||||
var server resource.ResourceServer
|
||||
var searchServer resource.SearchServer
|
||||
|
||||
t.Run("Create initial resources in storage", func(t *testing.T) {
|
||||
initialResources := []struct {
|
||||
@@ -96,25 +97,34 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Create a resource server with both backends", func(t *testing.T) {
|
||||
// Create a resource server with both backends
|
||||
// Create search server first
|
||||
var err error
|
||||
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
||||
Backend: backend,
|
||||
Search: resource.SearchOptions{
|
||||
Backend: searchBackend,
|
||||
Resources: &resource.TestDocumentBuilderSupplier{
|
||||
GroupsResources: map[string]string{
|
||||
"test.grafana.app": "testresources",
|
||||
},
|
||||
searchOpts := resource.SearchOptions{
|
||||
Backend: searchBackend,
|
||||
Resources: &resource.TestDocumentBuilderSupplier{
|
||||
GroupsResources: map[string]string{
|
||||
"test.grafana.app": "testresources",
|
||||
},
|
||||
},
|
||||
}
|
||||
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, searchServer)
|
||||
|
||||
// Initialize the search server
|
||||
err = searchServer.Init(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a resource server separately from the search server
|
||||
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
||||
Backend: backend,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Search for initial resources", func(t *testing.T) {
|
||||
// Test 1: Search for initial resources
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -194,7 +204,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search for documents", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -212,7 +222,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search with tags", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -231,7 +241,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||
|
||||
// this is the correct way of searching by tag
|
||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -253,7 +263,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search with specific tag", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -272,7 +282,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||
|
||||
// this is the correct way of searching by tag
|
||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
|
||||
@@ -134,7 +134,7 @@ func StartGrafanaEnvWithDB(t *testing.T, grafDir, cfgPath string) (string, *serv
|
||||
})
|
||||
|
||||
// UnifiedStorageOverGRPC
|
||||
var storage sql.UnifiedStorageGrpcService
|
||||
var storage sql.UnifiedGrpcService
|
||||
if runstore {
|
||||
storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore,
|
||||
env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil, nil)
|
||||
|
||||
Reference in New Issue
Block a user