Compare commits

...

6 Commits

Author SHA1 Message Date
Rafael Paulovic
b0bb71f834 fix: wire 2026-01-14 19:23:10 +01:00
mayor
a7aa55f908 Add search-server target and configurable search mode
- Add SearchServer module target for standalone search service
- Add search_mode config: "", "embedded" (default), "remote"
- Add search_server_address config for remote search server
- Create remote_search.go: gRPC client wrapper for remote search
- Create search_service.go: standalone search gRPC service
- Modify service.go: conditional search mode handling
- Backward compatible: empty/embedded mode works as before

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 17:49:03 +01:00
mayor
0a61846b5e Initialize backend before search server
The backend's Init() must be called before search.Init() because
buildIndexes() calls storage.GetResourceStats() which requires
the database connection to be established.

Previously, backend.Init() was called by ResourceServer.Init()
through the Lifecycle hooks, but now search.Init() runs before
ResourceServer is created.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:58:30 +01:00
mayor
785cc739ee Fix nil pointer in NewSearchServer - use serverOptions.Backend
The backend may be created inside the function and assigned to
serverOptions.Backend, not opts.Backend. Using opts.Backend caused
a nil pointer dereference in searchSupport.buildIndexes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:22:42 +01:00
mayor
4913baaf04 Fix compilation errors from SearchServer extraction
- Update NewLocalResourceClient to accept SearchServer parameter
- Add SearchClient methods to ResourceClient interface (client needs both)
- Fix directResourceClient to implement new interface with stub methods
- Update search_and_storage.go test to create SearchServer separately

This continues the work of extracting SearchServer from ResourceServer
to enable independent usage by storage.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 17:59:46 +01:00
Peter Štibraný
c8f1efe7c7 Initial prototype extracting SearchServer from ResourceServer. 2026-01-12 17:50:39 +01:00
21 changed files with 650 additions and 129 deletions

View File

@@ -10,6 +10,7 @@ const (
SearchServerRing string = "search-server-ring"
SearchServerDistributor string = "search-server-distributor"
StorageServer string = "storage-server"
SearchServer string = "search-server"
ZanzanaServer string = "zanzana-server"
InstrumentationServer string = "instrumentation-server"
FrontendServer string = "frontend-server"
@@ -21,6 +22,7 @@ var dependencyMap = map[string][]string{
SearchServerRing: {InstrumentationServer, MemberlistKV},
GrafanaAPIServer: {InstrumentationServer},
StorageServer: {InstrumentationServer, SearchServerRing},
SearchServer: {InstrumentationServer, SearchServerRing},
ZanzanaServer: {InstrumentationServer},
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
Core: {},

View File

@@ -38,9 +38,9 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
return d.server.GetBlob(ctx, in)
}
// GetStats implements ResourceClient.
// GetStats implements ResourceClient (SearchClient).
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
return nil, fmt.Errorf("GetStats not supported with direct resource client")
}
// IsHealthy implements ResourceClient.
@@ -53,12 +53,14 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
return d.server.List(ctx, in)
}
// ListManagedObjects implements ResourceClient (SearchClient).
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
return nil, fmt.Errorf("ListManagedObjects not supported with direct resource client")
}
// CountManagedObjects implements ResourceClient (SearchClient).
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
return nil, fmt.Errorf("CountManagedObjects not supported with direct resource client")
}
// PutBlob implements ResourceClient.
@@ -71,9 +73,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
return d.server.Read(ctx, in)
}
// Search implements ResourceClient.
// Search implements ResourceClient (SearchClient).
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
return nil, fmt.Errorf("Search not supported with direct resource client")
}
// Update implements ResourceClient.

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"

View File

@@ -37,7 +37,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client,
) *FolderAPIBuilder {
builder := &FolderAPIBuilder{
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder
}
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{
features: features,
accessClient: ac,

View File

@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -205,6 +205,14 @@ func (s *ModuleServer) Run() error {
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
})
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
docBuilders, err := InitializeDocumentBuilders(s.cfg)
if err != nil {
return nil, err
}
return sql.ProvideSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer)
})

View File

@@ -65,6 +65,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/grafana/pkg/storage/unified/sql"
@@ -146,6 +147,7 @@ var wireExtsBasicSet = wire.NewSet(
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
wire.Struct(new(unified.Options), "*"),
unified.ProvideUnifiedStorageClient,
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
sql.ProvideStorageBackend,
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
aggregatorrunner.ProvideNoopAggregatorConfigurator,

View File

@@ -623,6 +623,8 @@ type Cfg struct {
OverridesFilePath string
OverridesReloadInterval time.Duration
EnableSQLKVBackend bool
SearchMode string // "", "embedded", "remote" - empty defaults to embedded
SearchServerAddress string // gRPC address for remote search server
// Secrets Management
SecretsManagement SecretsManagerSettings

View File

@@ -136,6 +136,10 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
// use sqlkv (resource/sqlkv) instead of the sql backend (sql/backend) as the StorageServer
cfg.EnableSQLKVBackend = section.Key("enable_sqlkv_backend").MustBool(false)
// search mode: "", "embedded", "remote" - empty defaults to embedded for backward compatibility
cfg.SearchMode = section.Key("search_mode").MustString("")
cfg.SearchServerAddress = section.Key("search_server_address").String()
cfg.MaxFileIndexAge = section.Key("max_file_index_age").MustDuration(0)
cfg.MinFileIndexBuildVersion = section.Key("min_file_index_build_version").MustString("")
}

View File

@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
secrets,
originalStorageConfig,
nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
nil, // secrets
originalStorageConfig,
nil,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage(

View File

@@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
serverOptions.OverridesService = overridesSvc
}
server, err := sql.NewResourceServer(serverOptions)
server, searchServer, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, searchServer), nil
}
}

View File

@@ -31,15 +31,20 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type SearchClient interface {
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
}
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
resourcepb.QuotasClient
// SearchClient methods are included for convenience - the client typically needs both
SearchClient
}
// Internal implementation
@@ -92,16 +97,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
return newResourceClient(cc, cci)
}
func NewLocalResourceClient(server ResourceServer) ResourceClient {
func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
// scenario: local in-proc
channel := &inprocgrpc.Channel{}
indexChannel := &inprocgrpc.Channel{}
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,
@@ -117,13 +121,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
)
}
// Register search services on the index channel if searchServer is provided
if searchServer != nil {
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
} {
indexChannel.RegisterService(
grpchan.InterceptServer(
desc,
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
grpcAuth.StreamServerInterceptor(grpcAuthInt),
),
searchServer,
)
}
}
clientInt := authnlib.NewGrpcClientInterceptor(
ProvideInProcExchanger(),
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
)
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cc)
cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cci)
}
type RemoteResourceClientConfig struct {

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource
}
// This supports indexing+search regardless of implementation
var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct {
log log.Logger
storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
)
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support
if opts.Backend == nil {
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil
}
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
type RingClient struct {
Client ResourceClient
Client SearchClient
grpc_health_v1.HealthClient
Conn *grpc.ClientConn
}
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil {
return nil, err
}
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r)
}
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace))
if err != nil {

View File

@@ -34,12 +34,17 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
}
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer
resourcepb.QuotasServer
@@ -222,7 +227,8 @@ type ResourceServerOptions struct {
Blob BlobConfig
// Search options
Search SearchOptions
SearchOptions SearchOptions // TODO: needed?
Search SearchServer
// Quota service
OverridesService *OverridesService
@@ -251,16 +257,12 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
}
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -343,23 +345,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
search: opts.Search,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
}
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
/*
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
}
}
}
*/
err := s.Init(ctx)
if err != nil {
@@ -377,7 +381,7 @@ type server struct {
backend StorageBackend
blob BlobSupport
secure secrets.InlineSecureValueSupport
search *searchSupport
search SearchServer
diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient
writeHooks WriteAccessHooks
@@ -424,11 +428,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx)
}
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
@@ -453,10 +452,6 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true
@@ -1372,47 +1367,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
}
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req)

View File

@@ -0,0 +1,83 @@
package sql
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
var _ resource.SearchServer = (*remoteSearchClient)(nil)
// remoteSearchClient wraps gRPC search clients to implement the SearchServer interface.
// This allows the storage server to delegate search operations to a remote search server.
type remoteSearchClient struct {
conn *grpc.ClientConn
index resourcepb.ResourceIndexClient
moiClient resourcepb.ManagedObjectIndexClient
}
// newRemoteSearchClient creates a new remote search client that connects to a search server at the given address.
func newRemoteSearchClient(address string) (*remoteSearchClient, error) {
if address == "" {
return nil, fmt.Errorf("search server address is required for remote search mode")
}
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to search server: %w", err)
}
return &remoteSearchClient{
conn: conn,
index: resourcepb.NewResourceIndexClient(conn),
moiClient: resourcepb.NewManagedObjectIndexClient(conn),
}, nil
}
// Init implements resource.LifecycleHooks.
// For remote search, there's nothing to initialize locally.
func (r *remoteSearchClient) Init(ctx context.Context) error {
return nil
}
// Stop implements resource.LifecycleHooks.
// Closes the gRPC connection.
func (r *remoteSearchClient) Stop(ctx context.Context) error {
if r.conn != nil {
return r.conn.Close()
}
return nil
}
// Search implements resourcepb.ResourceIndexServer.
func (r *remoteSearchClient) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
return r.index.Search(ctx, req)
}
// GetStats implements resourcepb.ResourceIndexServer.
func (r *remoteSearchClient) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
return r.index.GetStats(ctx, req)
}
// RebuildIndexes implements resourcepb.ResourceIndexServer.
func (r *remoteSearchClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
return r.index.RebuildIndexes(ctx, req)
}
// CountManagedObjects implements resourcepb.ManagedObjectIndexServer.
func (r *remoteSearchClient) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
return r.moiClient.CountManagedObjects(ctx, req)
}
// ListManagedObjects implements resourcepb.ManagedObjectIndexServer.
func (r *remoteSearchClient) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
return r.moiClient.ListManagedObjects(ctx, req)
}

View File

@@ -0,0 +1,353 @@
package sql
import (
"context"
"errors"
"fmt"
"hash/fnv"
"net"
"os"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search"
)
var (
_ SearchGrpcService = (*searchService)(nil)
)
// SearchGrpcService is the interface for the standalone search gRPC service.
type SearchGrpcService interface {
services.NamedService
// GetAddress returns the address where this service is running
GetAddress() string
}
type searchService struct {
*services.BasicService
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
log log.Logger
reg prometheus.Registerer
indexMetrics *resource.BleveIndexMetrics
docBuilders resource.DocumentBuilderSupplier
searchRing *ring.Ring
ringLifecycler *ring.BasicLifecycler
backend resource.StorageBackend
}
// ProvideSearchGrpcService creates a standalone search gRPC service.
// This is used when running search-server as a separate target.
func ProvideSearchGrpcService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
log log.Logger,
reg prometheus.Registerer,
docBuilders resource.DocumentBuilderSupplier,
indexMetrics *resource.BleveIndexMetrics,
searchRing *ring.Ring,
memberlistKVConfig kv.Config,
backend resource.StorageBackend,
) (SearchGrpcService, error) {
tracer := otel.Tracer("search-server")
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
auth := grpc.Authenticator{Tracer: tracer}
return auth.Authenticate(ctx)
})
s := &searchService{
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
stoppedCh: make(chan error, 1),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
docBuilders: docBuilders,
indexMetrics: indexMetrics,
searchRing: searchRing,
subservicesWatcher: services.NewFailureWatcher(),
backend: backend,
}
subservices := []services.Service{}
if cfg.EnableSharding {
ringStore, err := kv.NewClient(
memberlistKVConfig,
ring.GetCodec(),
kv.RegistererWithKVName(reg, resource.RingName),
log,
)
if err != nil {
return nil, fmt.Errorf("failed to create KV store client: %s", err)
}
lifecyclerCfg, err := toSearchLifecyclerConfig(cfg, log)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
}
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
s.ringLifecycler, err = ring.NewBasicLifecycler(
lifecyclerCfg,
resource.RingName,
resource.RingKey,
ringStore,
delegate,
log,
reg,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
}
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
subservices = append(subservices, s.ringLifecycler)
}
if len(subservices) > 0 {
s.hasSubservices = true
var err error
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
return s, nil
}
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
if s.searchRing == nil {
return true, nil
}
if st := s.searchRing.State(); st != services.Running {
return false, fmt.Errorf("ring is not Running: %s", st)
}
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(key.Namespace))
if err != nil {
return false, fmt.Errorf("error hashing namespace: %w", err)
}
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
if err != nil {
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
}
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
}
func (s *searchService) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
if err != nil {
return err
}
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil {
return err
}
// Create search server
searchServer, err := resource.NewSearchServer(searchOptions, s.backend, authzClient, nil, s.indexMetrics, s.OwnsIndex)
if err != nil {
return fmt.Errorf("failed to create search server: %w", err)
}
if err := searchServer.Init(ctx); err != nil {
return fmt.Errorf("failed to initialize search server: %w", err)
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
srv := s.handler.GetServer()
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
grpc_health_v1.RegisterHealthServer(srv, &searchHealthService{searchServer: searchServer})
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
if err != nil {
return err
}
if s.cfg.EnableSharding {
s.log.Info("waiting until search server is JOINING in the ring")
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
defer cancel()
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
}
s.log.Info("search server is JOINING in the ring")
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
}
s.log.Info("search server is ACTIVE in the ring")
}
// start the gRPC server
go func() {
err := s.handler.Run(ctx)
if err != nil {
s.stoppedCh <- err
} else {
s.stoppedCh <- nil
}
}()
return nil
}
func (s *searchService) GetAddress() string {
return s.handler.GetAddress()
}
func (s *searchService) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *searchService) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}
func toSearchLifecyclerConfig(cfg *setting.Cfg, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.MemberlistBindAddr, netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger), logger, true)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}
instanceId := cfg.InstanceID
if instanceId == "" {
hostname, err := os.Hostname()
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}
instanceId = hostname
}
_, grpcPortStr, err := net.SplitHostPort(cfg.GRPCServer.Address)
if err != nil {
return ring.BasicLifecyclerConfig{}, fmt.Errorf("could not get grpc port from grpc server address: %s", err)
}
grpcPort, err := strconv.Atoi(grpcPortStr)
if err != nil {
return ring.BasicLifecyclerConfig{}, fmt.Errorf("error converting grpc address port to int: %s", err)
}
return ring.BasicLifecyclerConfig{
Addr: fmt.Sprintf("%s:%d", instanceAddr, grpcPort),
ID: instanceId,
HeartbeatPeriod: 15 * time.Second,
HeartbeatTimeout: resource.RingHeartbeatTimeout,
TokensObservePeriod: 0,
NumTokens: resource.RingNumTokens,
}, nil
}
// searchHealthService implements the health check for the search service.
type searchHealthService struct {
searchServer resource.SearchServer
}
func (h *searchHealthService) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
func (h *searchHealthService) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error {
return fmt.Errorf("watch not implemented")
}
func (h *searchHealthService) List(ctx context.Context, req *grpc_health_v1.HealthListRequest) (*grpc_health_v1.HealthListResponse, error) {
check, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if err != nil {
return nil, err
}
return &grpc_health_v1.HealthListResponse{
Statuses: map[string]*grpc_health_v1.HealthCheckResponse{
"": check,
},
}, nil
}

View File

@@ -48,7 +48,7 @@ type ServerOptions struct {
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -59,7 +59,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
nil, // not needed for gRPC client mode
)
if err != nil {
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
}
opts.SecureValues = inlineSecureValueService
}
@@ -79,7 +79,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Blob.URL = "file:///" + dir
}
@@ -96,13 +96,13 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
return nil, nil, err
}
if opts.Cfg.EnableSQLKVBackend {
sqlkv, err := resource.NewSQLKV(eDB)
if err != nil {
return nil, fmt.Errorf("error creating sqlkv: %s", err)
return nil, nil, fmt.Errorf("error creating sqlkv: %s", err)
}
kvBackendOpts := resource.KVBackendOptions{
@@ -114,12 +114,12 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
ctx := context.Background()
dbConn, err := eDB.Init(ctx)
if err != nil {
return nil, fmt.Errorf("error initializing DB: %w", err)
return nil, nil, fmt.Errorf("error initializing DB: %w", err)
}
dialect := sqltemplate.DialectForDriver(dbConn.DriverName())
if dialect == nil {
return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
return nil, nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName())
}
rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{
@@ -127,14 +127,14 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
DB: dbConn,
})
if err != nil {
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
return nil, nil, fmt.Errorf("failed to create resource version manager: %w", err)
}
// TODO add config to decide whether to pass RvManager or not
kvBackendOpts.RvManager = rvManager
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
if err != nil {
return nil, fmt.Errorf("error creating kv backend: %s", err)
return nil, nil, fmt.Errorf("error creating kv backend: %s", err)
}
serverOptions.Backend = kvBackend
@@ -151,7 +151,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
})
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Backend = backend
serverOptions.Diagnostics = backend
@@ -159,13 +159,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
}
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
// Initialize the backend before creating search server (it needs the DB connection)
if serverOptions.Lifecycle != nil {
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to initialize backend: %w", err)
}
}
search, err := resource.NewSearchServer(opts.SearchOptions, serverOptions.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
if err := search.Init(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
serverOptions.Search = search
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions)
rs, err := resource.NewResourceServer(serverOptions)
if err != nil {
_ = search.Stop(context.Background())
}
return rs, search, err
}
// isHighAvailabilityEnabled determines if high availability mode should

View File

@@ -292,10 +292,50 @@ func (s *service) starting(ctx context.Context) error {
serverOptions.OverridesService = overridesSvc
}
server, err := NewResourceServer(serverOptions)
if err != nil {
return err
// Handle search mode: "", "embedded", or "remote"
// Empty string defaults to "embedded" for backward compatibility
var searchServer resource.SearchServer
registerSearchServices := true
switch s.cfg.SearchMode {
case "remote":
// Use remote search client - don't register search services locally
s.log.Info("Using remote search server", "address", s.cfg.SearchServerAddress)
remoteSearch, err := newRemoteSearchClient(s.cfg.SearchServerAddress)
if err != nil {
return fmt.Errorf("failed to create remote search client: %w", err)
}
searchServer = remoteSearch
registerSearchServices = false
case "", "embedded":
// Default: create local search server (backward compatible)
s.log.Info("Using embedded search server")
// SearchOptions are already configured, NewResourceServer will create the search server
default:
return fmt.Errorf("invalid search_mode: %s (valid values: \"\", \"embedded\", \"remote\")", s.cfg.SearchMode)
}
var server resource.ResourceServer
if searchServer != nil {
// Remote search mode: pass the remote search client to the resource server
// Clear local search options since we're using remote search
serverOptions.SearchOptions = resource.SearchOptions{}
var err error
server, _, err = NewResourceServer(serverOptions)
if err != nil {
return err
}
} else {
// Embedded mode: create both server and search server together
var err error
server, searchServer, err = NewResourceServer(serverOptions)
if err != nil {
return err
}
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
@@ -309,8 +349,11 @@ func (s *service) starting(ctx context.Context) error {
srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server)
resourcepb.RegisterBulkStoreServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, server)
resourcepb.RegisterManagedObjectIndexServer(srv, server)
// Only register search services if running in embedded mode
if registerSearchServices {
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
}
resourcepb.RegisterBlobStoreServer(srv, server)
resourcepb.RegisterDiagnosticsServer(srv, server)
resourcepb.RegisterQuotasServer(srv, server)

View File

@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
nsPrefix := "test-ns"
var server resource.ResourceServer
var searchServer resource.SearchServer
t.Run("Create initial resources in storage", func(t *testing.T) {
initialResources := []struct {
@@ -96,25 +97,35 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Create a resource server with both backends", func(t *testing.T) {
// Create a resource server with both backends
// Create search server first
var err error
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
Search: resource.SearchOptions{
Backend: searchBackend,
Resources: &resource.TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"test.grafana.app": "testresources",
},
searchOpts := resource.SearchOptions{
Backend: searchBackend,
Resources: &resource.TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"test.grafana.app": "testresources",
},
},
}
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
require.NoError(t, err)
require.NotNil(t, searchServer)
// Initialize the search server
err = searchServer.Init(ctx)
require.NoError(t, err)
// Create a resource server with the search server
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
Search: searchServer,
})
require.NoError(t, err)
})
t.Run("Search for initial resources", func(t *testing.T) {
// Test 1: Search for initial resources
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -194,7 +205,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search for documents", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -212,7 +223,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search with tags", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -231,7 +242,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -253,7 +264,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
})
t.Run("Search with specific tag", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",
@@ -272,7 +283,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "test.grafana.app",