mirror of
https://github.com/grafana/grafana.git
synced 2026-01-14 21:25:50 +00:00
Compare commits
6 Commits
ash/react-
...
feature/ex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0bb71f834 | ||
|
|
a7aa55f908 | ||
|
|
0a61846b5e | ||
|
|
785cc739ee | ||
|
|
4913baaf04 | ||
|
|
c8f1efe7c7 |
@@ -10,6 +10,7 @@ const (
|
||||
SearchServerRing string = "search-server-ring"
|
||||
SearchServerDistributor string = "search-server-distributor"
|
||||
StorageServer string = "storage-server"
|
||||
SearchServer string = "search-server"
|
||||
ZanzanaServer string = "zanzana-server"
|
||||
InstrumentationServer string = "instrumentation-server"
|
||||
FrontendServer string = "frontend-server"
|
||||
@@ -21,6 +22,7 @@ var dependencyMap = map[string][]string{
|
||||
SearchServerRing: {InstrumentationServer, MemberlistKV},
|
||||
GrafanaAPIServer: {InstrumentationServer},
|
||||
StorageServer: {InstrumentationServer, SearchServerRing},
|
||||
SearchServer: {InstrumentationServer, SearchServerRing},
|
||||
ZanzanaServer: {InstrumentationServer},
|
||||
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
|
||||
Core: {},
|
||||
|
||||
@@ -38,9 +38,9 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
|
||||
return d.server.GetBlob(ctx, in)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceClient.
|
||||
// GetStats implements ResourceClient (SearchClient).
|
||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return d.server.GetStats(ctx, in)
|
||||
return nil, fmt.Errorf("GetStats not supported with direct resource client")
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceClient.
|
||||
@@ -53,12 +53,14 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
|
||||
return d.server.List(ctx, in)
|
||||
}
|
||||
|
||||
// ListManagedObjects implements ResourceClient (SearchClient).
|
||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return d.server.ListManagedObjects(ctx, in)
|
||||
return nil, fmt.Errorf("ListManagedObjects not supported with direct resource client")
|
||||
}
|
||||
|
||||
// CountManagedObjects implements ResourceClient (SearchClient).
|
||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return d.server.CountManagedObjects(ctx, in)
|
||||
return nil, fmt.Errorf("CountManagedObjects not supported with direct resource client")
|
||||
}
|
||||
|
||||
// PutBlob implements ResourceClient.
|
||||
@@ -71,9 +73,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
|
||||
return d.server.Read(ctx, in)
|
||||
}
|
||||
|
||||
// Search implements ResourceClient.
|
||||
// Search implements ResourceClient (SearchClient).
|
||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return d.server.Search(ctx, in)
|
||||
return nil, fmt.Errorf("Search not supported with direct resource client")
|
||||
}
|
||||
|
||||
// Update implements ResourceClient.
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
|
||||
@@ -37,7 +37,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
acService accesscontrol.Service,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
unified resourcepb.ResourceIndexClient,
|
||||
zanzanaClient zanzana.Client,
|
||||
) *FolderAPIBuilder {
|
||||
builder := &FolderAPIBuilder{
|
||||
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
return builder
|
||||
}
|
||||
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
return &FolderAPIBuilder{
|
||||
features: features,
|
||||
accessClient: ac,
|
||||
|
||||
@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||
|
||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||
|
||||
@@ -205,6 +205,14 @@ func (s *ModuleServer) Run() error {
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
|
||||
docBuilders, err := InitializeDocumentBuilders(s.cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.ProvideSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||
return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer)
|
||||
})
|
||||
|
||||
@@ -65,6 +65,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||
@@ -146,6 +147,7 @@ var wireExtsBasicSet = wire.NewSet(
|
||||
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
||||
wire.Struct(new(unified.Options), "*"),
|
||||
unified.ProvideUnifiedStorageClient,
|
||||
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
|
||||
sql.ProvideStorageBackend,
|
||||
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
||||
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
||||
|
||||
@@ -623,6 +623,8 @@ type Cfg struct {
|
||||
OverridesFilePath string
|
||||
OverridesReloadInterval time.Duration
|
||||
EnableSQLKVBackend bool
|
||||
SearchMode string // "", "embedded", "remote" - empty defaults to embedded
|
||||
SearchServerAddress string // gRPC address for remote search server
|
||||
|
||||
// Secrets Management
|
||||
SecretsManagement SecretsManagerSettings
|
||||
|
||||
@@ -136,6 +136,10 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
|
||||
// use sqlkv (resource/sqlkv) instead of the sql backend (sql/backend) as the StorageServer
|
||||
cfg.EnableSQLKVBackend = section.Key("enable_sqlkv_backend").MustBool(false)
|
||||
|
||||
// search mode: "", "embedded", "remote" - empty defaults to embedded for backward compatibility
|
||||
cfg.SearchMode = section.Key("search_mode").MustString("")
|
||||
cfg.SearchServerAddress = section.Key("search_server_address").String()
|
||||
|
||||
cfg.MaxFileIndexAge = section.Key("max_file_index_age").MustDuration(0)
|
||||
cfg.MinFileIndexBuildVersion = section.Key("min_file_index_build_version").MustString("")
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
secrets,
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
nil, // secrets
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
|
||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
default:
|
||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||
}
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
|
||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||
store, destroyFunc, err := apistore.NewStorage(
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, nil), nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
server, searchServer, err := sql.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, searchServer), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,15 +31,20 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
type SearchClient interface {
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
}
|
||||
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
resourcepb.BulkStoreClient
|
||||
resourcepb.BlobStoreClient
|
||||
resourcepb.DiagnosticsClient
|
||||
resourcepb.QuotasClient
|
||||
// SearchClient methods are included for convenience - the client typically needs both
|
||||
SearchClient
|
||||
}
|
||||
|
||||
// Internal implementation
|
||||
@@ -92,16 +97,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
|
||||
return newResourceClient(cc, cci)
|
||||
}
|
||||
|
||||
func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
|
||||
// scenario: local in-proc
|
||||
channel := &inprocgrpc.Channel{}
|
||||
indexChannel := &inprocgrpc.Channel{}
|
||||
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceStore_ServiceDesc,
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
&resourcepb.BlobStore_ServiceDesc,
|
||||
&resourcepb.BulkStore_ServiceDesc,
|
||||
&resourcepb.Diagnostics_ServiceDesc,
|
||||
@@ -117,13 +121,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
)
|
||||
}
|
||||
|
||||
// Register search services on the index channel if searchServer is provided
|
||||
if searchServer != nil {
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
} {
|
||||
indexChannel.RegisterService(
|
||||
grpchan.InterceptServer(
|
||||
desc,
|
||||
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
|
||||
grpcAuth.StreamServerInterceptor(grpcAuthInt),
|
||||
),
|
||||
searchServer,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
clientInt := authnlib.NewGrpcClientInterceptor(
|
||||
ProvideInProcExchanger(),
|
||||
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
||||
)
|
||||
|
||||
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||
return newResourceClient(cc, cc)
|
||||
cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||
return newResourceClient(cc, cci)
|
||||
}
|
||||
|
||||
type RemoteResourceClientConfig struct {
|
||||
|
||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
||||
GetOpenIndexes() []NamespacedResource
|
||||
}
|
||||
|
||||
// This supports indexing+search regardless of implementation
|
||||
var _ SearchServer = &searchSupport{}
|
||||
|
||||
// This supports indexing+search regardless of implementation.
|
||||
// Implements SearchServer interface.
|
||||
type searchSupport struct {
|
||||
log log.Logger
|
||||
storage StorageBackend
|
||||
@@ -160,6 +163,10 @@ var (
|
||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||
)
|
||||
|
||||
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||
}
|
||||
|
||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||
// No backend search support
|
||||
if opts.Backend == nil {
|
||||
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
||||
return totalBatchesIndexed, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) Init(ctx context.Context) error {
|
||||
return s.init(ctx)
|
||||
}
|
||||
|
||||
func (s *searchSupport) Stop(_ context.Context) error {
|
||||
s.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) init(ctx context.Context) error {
|
||||
origCtx := ctx
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
type RingClient struct {
|
||||
Client ResourceClient
|
||||
Client SearchClient
|
||||
grpc_health_v1.HealthClient
|
||||
Conn *grpc.ClientConn
|
||||
}
|
||||
@@ -99,7 +99,7 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
||||
return client.ListManagedObjects(ctx, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(namespace))
|
||||
if err != nil {
|
||||
|
||||
@@ -34,12 +34,17 @@ import (
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
type SearchServer interface {
|
||||
LifecycleHooks
|
||||
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
}
|
||||
|
||||
// ResourceServer implements all gRPC services
|
||||
type ResourceServer interface {
|
||||
resourcepb.ResourceStoreServer
|
||||
resourcepb.BulkStoreServer
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.BlobStoreServer
|
||||
resourcepb.DiagnosticsServer
|
||||
resourcepb.QuotasServer
|
||||
@@ -222,7 +227,8 @@ type ResourceServerOptions struct {
|
||||
Blob BlobConfig
|
||||
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
SearchOptions SearchOptions // TODO: needed?
|
||||
Search SearchServer
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
@@ -251,16 +257,12 @@ type ResourceServerOptions struct {
|
||||
|
||||
storageMetrics *StorageMetrics
|
||||
|
||||
IndexMetrics *BleveIndexMetrics
|
||||
|
||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||
MaxPageSizeBytes int
|
||||
|
||||
// QOSQueue is the quality of service queue used to enqueue
|
||||
QOSQueue QOSEnqueuer
|
||||
QOSConfig QueueConfig
|
||||
|
||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
@@ -343,23 +345,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
storageMetrics: opts.storageMetrics,
|
||||
indexMetrics: opts.IndexMetrics,
|
||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||
reg: opts.Reg,
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
search: opts.Search,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
|
||||
}
|
||||
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
/*
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
err := s.Init(ctx)
|
||||
if err != nil {
|
||||
@@ -377,7 +381,7 @@ type server struct {
|
||||
backend StorageBackend
|
||||
blob BlobSupport
|
||||
secure secrets.InlineSecureValueSupport
|
||||
search *searchSupport
|
||||
search SearchServer
|
||||
diagnostics resourcepb.DiagnosticsServer
|
||||
access claims.AccessClient
|
||||
writeHooks WriteAccessHooks
|
||||
@@ -424,11 +428,6 @@ func (s *server) Init(ctx context.Context) error {
|
||||
s.initErr = s.overridesService.init(ctx)
|
||||
}
|
||||
|
||||
// initialize the search index
|
||||
if s.initErr == nil && s.search != nil {
|
||||
s.initErr = s.search.init(ctx)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil {
|
||||
s.initErr = s.initWatcher()
|
||||
@@ -453,10 +452,6 @@ func (s *server) Stop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.search != nil {
|
||||
s.search.stop()
|
||||
}
|
||||
|
||||
if s.overridesService != nil {
|
||||
if err := s.overridesService.stop(ctx); err != nil {
|
||||
stopFailed = true
|
||||
@@ -1372,47 +1367,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceServer.
|
||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
if err := s.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.search == nil {
|
||||
// If the backend implements "GetStats", we can use it
|
||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||||
if ok {
|
||||
return srv.GetStats(ctx, req)
|
||||
}
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
return s.search.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.ListManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return s.diagnostics.IsHealthy(ctx, req)
|
||||
|
||||
83
pkg/storage/unified/sql/remote_search.go
Normal file
83
pkg/storage/unified/sql/remote_search.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
var _ resource.SearchServer = (*remoteSearchClient)(nil)
|
||||
|
||||
// remoteSearchClient wraps gRPC search clients to implement the SearchServer interface.
|
||||
// This allows the storage server to delegate search operations to a remote search server.
|
||||
type remoteSearchClient struct {
|
||||
conn *grpc.ClientConn
|
||||
index resourcepb.ResourceIndexClient
|
||||
moiClient resourcepb.ManagedObjectIndexClient
|
||||
}
|
||||
|
||||
// newRemoteSearchClient creates a new remote search client that connects to a search server at the given address.
|
||||
func newRemoteSearchClient(address string) (*remoteSearchClient, error) {
|
||||
if address == "" {
|
||||
return nil, fmt.Errorf("search server address is required for remote search mode")
|
||||
}
|
||||
|
||||
conn, err := grpc.NewClient(address,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create gRPC connection to search server: %w", err)
|
||||
}
|
||||
|
||||
return &remoteSearchClient{
|
||||
conn: conn,
|
||||
index: resourcepb.NewResourceIndexClient(conn),
|
||||
moiClient: resourcepb.NewManagedObjectIndexClient(conn),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Init implements resource.LifecycleHooks.
|
||||
// For remote search, there's nothing to initialize locally.
|
||||
func (r *remoteSearchClient) Init(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements resource.LifecycleHooks.
|
||||
// Closes the gRPC connection.
|
||||
func (r *remoteSearchClient) Stop(ctx context.Context) error {
|
||||
if r.conn != nil {
|
||||
return r.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Search implements resourcepb.ResourceIndexServer.
|
||||
func (r *remoteSearchClient) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return r.index.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements resourcepb.ResourceIndexServer.
|
||||
func (r *remoteSearchClient) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return r.index.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
// RebuildIndexes implements resourcepb.ResourceIndexServer.
|
||||
func (r *remoteSearchClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
|
||||
return r.index.RebuildIndexes(ctx, req)
|
||||
}
|
||||
|
||||
// CountManagedObjects implements resourcepb.ManagedObjectIndexServer.
|
||||
func (r *remoteSearchClient) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return r.moiClient.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// ListManagedObjects implements resourcepb.ManagedObjectIndexServer.
|
||||
func (r *remoteSearchClient) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return r.moiClient.ListManagedObjects(ctx, req)
|
||||
}
|
||||
353
pkg/storage/unified/sql/search_service.go
Normal file
353
pkg/storage/unified/sql/search_service.go
Normal file
@@ -0,0 +1,353 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
|
||||
"github.com/grafana/dskit/kv"
|
||||
"github.com/grafana/dskit/netutil"
|
||||
"github.com/grafana/dskit/ring"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/services/authz"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
)
|
||||
|
||||
var (
|
||||
_ SearchGrpcService = (*searchService)(nil)
|
||||
)
|
||||
|
||||
// SearchGrpcService is the interface for the standalone search gRPC service.
|
||||
type SearchGrpcService interface {
|
||||
services.NamedService
|
||||
|
||||
// GetAddress returns the address where this service is running
|
||||
GetAddress() string
|
||||
}
|
||||
|
||||
type searchService struct {
|
||||
*services.BasicService
|
||||
|
||||
// Subservices manager
|
||||
subservices *services.Manager
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
|
||||
handler grpcserver.Provider
|
||||
|
||||
tracing trace.Tracer
|
||||
|
||||
authenticator func(ctx context.Context) (context.Context, error)
|
||||
|
||||
log log.Logger
|
||||
reg prometheus.Registerer
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
|
||||
docBuilders resource.DocumentBuilderSupplier
|
||||
|
||||
searchRing *ring.Ring
|
||||
ringLifecycler *ring.BasicLifecycler
|
||||
|
||||
backend resource.StorageBackend
|
||||
}
|
||||
|
||||
// ProvideSearchGrpcService creates a standalone search gRPC service.
|
||||
// This is used when running search-server as a separate target.
|
||||
func ProvideSearchGrpcService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
db infraDB.DB,
|
||||
log log.Logger,
|
||||
reg prometheus.Registerer,
|
||||
docBuilders resource.DocumentBuilderSupplier,
|
||||
indexMetrics *resource.BleveIndexMetrics,
|
||||
searchRing *ring.Ring,
|
||||
memberlistKVConfig kv.Config,
|
||||
backend resource.StorageBackend,
|
||||
) (SearchGrpcService, error) {
|
||||
tracer := otel.Tracer("search-server")
|
||||
|
||||
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
||||
auth := grpc.Authenticator{Tracer: tracer}
|
||||
return auth.Authenticate(ctx)
|
||||
})
|
||||
|
||||
s := &searchService{
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
stopCh: make(chan struct{}),
|
||||
stoppedCh: make(chan error, 1),
|
||||
authenticator: authn,
|
||||
tracing: tracer,
|
||||
db: db,
|
||||
log: log,
|
||||
reg: reg,
|
||||
docBuilders: docBuilders,
|
||||
indexMetrics: indexMetrics,
|
||||
searchRing: searchRing,
|
||||
subservicesWatcher: services.NewFailureWatcher(),
|
||||
backend: backend,
|
||||
}
|
||||
|
||||
subservices := []services.Service{}
|
||||
if cfg.EnableSharding {
|
||||
ringStore, err := kv.NewClient(
|
||||
memberlistKVConfig,
|
||||
ring.GetCodec(),
|
||||
kv.RegistererWithKVName(reg, resource.RingName),
|
||||
log,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create KV store client: %s", err)
|
||||
}
|
||||
|
||||
lifecyclerCfg, err := toSearchLifecyclerConfig(cfg, log)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
|
||||
}
|
||||
|
||||
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
|
||||
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
|
||||
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
|
||||
|
||||
s.ringLifecycler, err = ring.NewBasicLifecycler(
|
||||
lifecyclerCfg,
|
||||
resource.RingName,
|
||||
resource.RingKey,
|
||||
ringStore,
|
||||
delegate,
|
||||
log,
|
||||
reg,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
|
||||
}
|
||||
|
||||
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
||||
subservices = append(subservices, s.ringLifecycler)
|
||||
}
|
||||
|
||||
if len(subservices) > 0 {
|
||||
s.hasSubservices = true
|
||||
var err error
|
||||
s.subservices, err = services.NewManager(subservices...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||
if s.searchRing == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if st := s.searchRing.State(); st != services.Running {
|
||||
return false, fmt.Errorf("ring is not Running: %s", st)
|
||||
}
|
||||
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||
}
|
||||
|
||||
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
|
||||
}
|
||||
|
||||
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
|
||||
}
|
||||
|
||||
func (s *searchService) starting(ctx context.Context) error {
|
||||
if s.hasSubservices {
|
||||
s.subservicesWatcher.WatchManager(s.subservices)
|
||||
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
||||
return fmt.Errorf("failed to start subservices: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create search server
|
||||
searchServer, err := resource.NewSearchServer(searchOptions, s.backend, authzClient, nil, s.indexMetrics, s.OwnsIndex)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create search server: %w", err)
|
||||
}
|
||||
|
||||
if err := searchServer.Init(ctx); err != nil {
|
||||
return fmt.Errorf("failed to initialize search server: %w", err)
|
||||
}
|
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
grpc_health_v1.RegisterHealthServer(srv, &searchHealthService{searchServer: searchServer})
|
||||
|
||||
// register reflection service
|
||||
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.cfg.EnableSharding {
|
||||
s.log.Info("waiting until search server is JOINING in the ring")
|
||||
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
|
||||
defer cancel()
|
||||
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
|
||||
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
|
||||
}
|
||||
s.log.Info("search server is JOINING in the ring")
|
||||
|
||||
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
|
||||
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
|
||||
}
|
||||
s.log.Info("search server is ACTIVE in the ring")
|
||||
}
|
||||
|
||||
// start the gRPC server
|
||||
go func() {
|
||||
err := s.handler.Run(ctx)
|
||||
if err != nil {
|
||||
s.stoppedCh <- err
|
||||
} else {
|
||||
s.stoppedCh <- nil
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchService) GetAddress() string {
|
||||
return s.handler.GetAddress()
|
||||
}
|
||||
|
||||
func (s *searchService) running(ctx context.Context) error {
|
||||
select {
|
||||
case err := <-s.stoppedCh:
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
case err := <-s.subservicesWatcher.Chan():
|
||||
return fmt.Errorf("subservice failure: %w", err)
|
||||
case <-ctx.Done():
|
||||
close(s.stopCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchService) stopping(_ error) error {
|
||||
if s.hasSubservices {
|
||||
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stop subservices: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func toSearchLifecyclerConfig(cfg *setting.Cfg, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
|
||||
instanceAddr, err := ring.GetInstanceAddr(cfg.MemberlistBindAddr, netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger), logger, true)
|
||||
if err != nil {
|
||||
return ring.BasicLifecyclerConfig{}, err
|
||||
}
|
||||
|
||||
instanceId := cfg.InstanceID
|
||||
if instanceId == "" {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return ring.BasicLifecyclerConfig{}, err
|
||||
}
|
||||
instanceId = hostname
|
||||
}
|
||||
|
||||
_, grpcPortStr, err := net.SplitHostPort(cfg.GRPCServer.Address)
|
||||
if err != nil {
|
||||
return ring.BasicLifecyclerConfig{}, fmt.Errorf("could not get grpc port from grpc server address: %s", err)
|
||||
}
|
||||
|
||||
grpcPort, err := strconv.Atoi(grpcPortStr)
|
||||
if err != nil {
|
||||
return ring.BasicLifecyclerConfig{}, fmt.Errorf("error converting grpc address port to int: %s", err)
|
||||
}
|
||||
|
||||
return ring.BasicLifecyclerConfig{
|
||||
Addr: fmt.Sprintf("%s:%d", instanceAddr, grpcPort),
|
||||
ID: instanceId,
|
||||
HeartbeatPeriod: 15 * time.Second,
|
||||
HeartbeatTimeout: resource.RingHeartbeatTimeout,
|
||||
TokensObservePeriod: 0,
|
||||
NumTokens: resource.RingNumTokens,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// searchHealthService implements the health check for the search service.
|
||||
type searchHealthService struct {
|
||||
searchServer resource.SearchServer
|
||||
}
|
||||
|
||||
func (h *searchHealthService) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
|
||||
return &grpc_health_v1.HealthCheckResponse{
|
||||
Status: grpc_health_v1.HealthCheckResponse_SERVING,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *searchHealthService) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error {
|
||||
return fmt.Errorf("watch not implemented")
|
||||
}
|
||||
|
||||
func (h *searchHealthService) List(ctx context.Context, req *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
|
||||
check, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &grpc_health_v1.HealthListResponse{
|
||||
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
|
||||
"": check,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -48,7 +48,7 @@ type ServerOptions struct {
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -59,7 +59,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
nil, // not needed for gRPC client mode
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
}
|
||||
opts.SecureValues = inlineSecureValueService
|
||||
}
|
||||
@@ -79,7 +79,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
|
||||
err := os.MkdirAll(dir, 0700)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Blob.URL = "file:///" + dir
|
||||
}
|
||||
@@ -96,13 +96,13 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if opts.Cfg.EnableSQLKVBackend {
|
||||
sqlkv, err := resource.NewSQLKV(eDB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating sqlkv: %s", err)
|
||||
return nil, nil, fmt.Errorf("error creating sqlkv: %s", err)
|
||||
}
|
||||
|
||||
kvBackendOpts := resource.KVBackendOptions{
|
||||
@@ -114,12 +114,12 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
ctx := context.Background()
|
||||
dbConn, err := eDB.Init(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing DB: %w", err)
|
||||
return nil, nil, fmt.Errorf("error initializing DB: %w", err)
|
||||
}
|
||||
|
||||
dialect := sqltemplate.DialectForDriver(dbConn.DriverName())
|
||||
if dialect == nil {
|
||||
return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
|
||||
return nil, nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
|
||||
}
|
||||
|
||||
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{
|
||||
@@ -127,14 +127,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
DB: dbConn,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create resource version manager: %w", err)
|
||||
}
|
||||
|
||||
// TODO add config to decide whether to pass RvManager or not
|
||||
kvBackendOpts.RvManager = rvManager
|
||||
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating kv backend: %s", err)
|
||||
return nil, nil, fmt.Errorf("error creating kv backend: %s", err)
|
||||
}
|
||||
|
||||
serverOptions.Backend = kvBackend
|
||||
@@ -151,7 +151,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Backend = backend
|
||||
serverOptions.Diagnostics = backend
|
||||
@@ -159,13 +159,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
}
|
||||
}
|
||||
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
// Initialize the backend before creating search server (it needs the DB connection)
|
||||
if serverOptions.Lifecycle != nil {
|
||||
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
search, err := resource.NewSearchServer(opts.SearchOptions, serverOptions.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
if err := search.Init(context.Background()); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
serverOptions.Search = search
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
serverOptions.OverridesService = opts.OverridesService
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
rs, err := resource.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
_ = search.Stop(context.Background())
|
||||
}
|
||||
return rs, search, err
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
|
||||
@@ -292,10 +292,50 @@ func (s *service) starting(ctx context.Context) error {
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
// Handle search mode: "", "embedded", or "remote"
|
||||
// Empty string defaults to "embedded" for backward compatibility
|
||||
var searchServer resource.SearchServer
|
||||
registerSearchServices := true
|
||||
|
||||
switch s.cfg.SearchMode {
|
||||
case "remote":
|
||||
// Use remote search client - don't register search services locally
|
||||
s.log.Info("Using remote search server", "address", s.cfg.SearchServerAddress)
|
||||
remoteSearch, err := newRemoteSearchClient(s.cfg.SearchServerAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create remote search client: %w", err)
|
||||
}
|
||||
searchServer = remoteSearch
|
||||
registerSearchServices = false
|
||||
|
||||
case "", "embedded":
|
||||
// Default: create local search server (backward compatible)
|
||||
s.log.Info("Using embedded search server")
|
||||
// SearchOptions are already configured, NewResourceServer will create the search server
|
||||
|
||||
default:
|
||||
return fmt.Errorf("invalid search_mode: %s (valid values: \"\", \"embedded\", \"remote\")", s.cfg.SearchMode)
|
||||
}
|
||||
|
||||
var server resource.ResourceServer
|
||||
if searchServer != nil {
|
||||
// Remote search mode: pass the remote search client to the resource server
|
||||
// Clear local search options since we're using remote search
|
||||
serverOptions.SearchOptions = resource.SearchOptions{}
|
||||
var err error
|
||||
server, _, err = NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Embedded mode: create both server and search server together
|
||||
var err error
|
||||
server, searchServer, err = NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -309,8 +349,11 @@ func (s *service) starting(ctx context.Context) error {
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
||||
// Only register search services if running in embedded mode
|
||||
if registerSearchServices {
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
}
|
||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
||||
resourcepb.RegisterQuotasServer(srv, server)
|
||||
|
||||
@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
nsPrefix := "test-ns"
|
||||
|
||||
var server resource.ResourceServer
|
||||
var searchServer resource.SearchServer
|
||||
|
||||
t.Run("Create initial resources in storage", func(t *testing.T) {
|
||||
initialResources := []struct {
|
||||
@@ -96,25 +97,35 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Create a resource server with both backends", func(t *testing.T) {
|
||||
// Create a resource server with both backends
|
||||
// Create search server first
|
||||
var err error
|
||||
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
||||
Backend: backend,
|
||||
Search: resource.SearchOptions{
|
||||
Backend: searchBackend,
|
||||
Resources: &resource.TestDocumentBuilderSupplier{
|
||||
GroupsResources: map[string]string{
|
||||
"test.grafana.app": "testresources",
|
||||
},
|
||||
searchOpts := resource.SearchOptions{
|
||||
Backend: searchBackend,
|
||||
Resources: &resource.TestDocumentBuilderSupplier{
|
||||
GroupsResources: map[string]string{
|
||||
"test.grafana.app": "testresources",
|
||||
},
|
||||
},
|
||||
}
|
||||
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, searchServer)
|
||||
|
||||
// Initialize the search server
|
||||
err = searchServer.Init(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a resource server with the search server
|
||||
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
||||
Backend: backend,
|
||||
Search: searchServer,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Search for initial resources", func(t *testing.T) {
|
||||
// Test 1: Search for initial resources
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -194,7 +205,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search for documents", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -212,7 +223,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search with tags", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -231,7 +242,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||
|
||||
// this is the correct way of searching by tag
|
||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -253,7 +264,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
})
|
||||
|
||||
t.Run("Search with specific tag", func(t *testing.T) {
|
||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
@@ -272,7 +283,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||
|
||||
// this is the correct way of searching by tag
|
||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||
Options: &resourcepb.ListOptions{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Group: "test.grafana.app",
|
||||
|
||||
Reference in New Issue
Block a user