Compare commits

...

2 Commits

Author SHA1 Message Date
Peter Štibraný
c7293f6fb5 Initial prototype extracting SearchServer from ResourceServer. 2025-12-16 09:41:56 +01:00
Peter Štibraný
584287dc61 Add support for LIST with filtering on selectable fields 2025-12-15 12:10:43 +01:00
32 changed files with 469 additions and 161 deletions

View File

@@ -8,6 +8,12 @@ foldersV1beta1: {
spec: {
title: string
description?: string
foo: bool
bar: int
}
}
selectableFields: [
"spec.title",
]
}

View File

@@ -5,13 +5,26 @@
package v1beta1
import (
"errors"
"github.com/grafana/grafana-app-sdk/resource"
)
// schema is unexported to prevent accidental overwrites
var (
schemaFolder = resource.NewSimpleSchema("folder.grafana.app", "v1beta1", NewFolder(), &FolderList{}, resource.WithKind("Folder"),
resource.WithPlural("folders"), resource.WithScope(resource.NamespacedScope))
resource.WithPlural("folders"), resource.WithScope(resource.NamespacedScope), resource.WithSelectableFields([]resource.SelectableField{resource.SelectableField{
FieldSelector: "spec.title",
FieldValueFunc: func(o resource.Object) (string, error) {
cast, ok := o.(*Folder)
if !ok {
return "", errors.New("provided object must be of type *Folder")
}
return cast.Spec.Title, nil
},
},
}))
kindFolder = resource.Kind{
Schema: schemaFolder,
Codecs: map[resource.KindEncoding]resource.Codec{

View File

@@ -18,6 +18,8 @@ import (
v1beta1 "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
)
var ()
var appManifestData = app.ManifestData{
AppName: "folder",
Group: "folder.grafana.app",
@@ -32,6 +34,9 @@ var appManifestData = app.ManifestData{
Plural: "Folders",
Scope: "Namespaced",
Conversion: false,
SelectableFields: []string{
"spec.title",
},
},
},
Routes: app.ManifestVersionRoutes{

View File

@@ -74,6 +74,10 @@ var appManifestData = app.ManifestData{
Plural: "Users",
Scope: "Namespaced",
Conversion: false,
SelectableFields: []string{
"spec.email",
"spec.login",
},
},
{

View File

@@ -4,11 +4,24 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, optsGetter generic.RESTOptionsGetter) (*registry.Store, error) {
type registryStoreOptions struct {
attrFunc storage.AttrFunc
}
type OptionFn func(*registryStoreOptions)
func WithAttrFunc(attrFunc storage.AttrFunc) OptionFn {
return func(opts *registryStoreOptions) {
opts.attrFunc = attrFunc
}
}
func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, optsGetter generic.RESTOptionsGetter, options ...OptionFn) (*registry.Store, error) {
gv := resourceInfo.GroupVersion()
gv.Version = runtime.APIVersionInternal
strategy := NewStrategy(scheme, gv)
@@ -20,7 +33,7 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
NewListFunc: resourceInfo.NewListFunc,
KeyRootFunc: KeyRootFunc(resourceInfo.GroupResource()),
KeyFunc: NamespaceKeyFunc(resourceInfo.GroupResource()),
PredicateFunc: Matcher,
//PredicateFunc: Matcher,
DefaultQualifiedResource: resourceInfo.GroupResource(),
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
TableConvertor: resourceInfo.TableConverter(),
@@ -28,8 +41,16 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
UpdateStrategy: strategy,
DeleteStrategy: strategy,
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
opts := &registryStoreOptions{
attrFunc: GetAttrs,
}
for _, opt := range options {
opt(opts)
}
o := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: opts.attrFunc}
if err := store.CompleteWithOptions(o); err != nil {
return nil, err
}
return store, nil

View File

@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
return d.server.GetBlob(ctx, in)
}
// GetStats implements ResourceClient.
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
}
//// GetStats implements ResourceClient.
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
// return d.server.GetStats(ctx, in)
//}
// IsHealthy implements ResourceClient.
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
return d.server.List(ctx, in)
}
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
}
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
}
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
// return d.server.ListManagedObjects(ctx, in)
//}
//
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
// return d.server.CountManagedObjects(ctx, in)
//}
// PutBlob implements ResourceClient.
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
}
// Search implements ResourceClient.
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
}
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
// return d.server.Search(ctx, in)
//}
// Update implements ResourceClient.
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"

View File

@@ -8,6 +8,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
@@ -22,6 +24,8 @@ import (
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana-app-sdk/logging"
sdkres "github.com/grafana/grafana-app-sdk/resource"
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
"github.com/grafana/grafana/apps/iam/pkg/reconcilers"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -37,7 +41,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@@ -74,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client,
) *FolderAPIBuilder {
builder := &FolderAPIBuilder{
@@ -93,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder
}
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{
features: features,
accessClient: ac,
@@ -129,6 +132,29 @@ func (b *FolderAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
Version: runtime.APIVersionInternal,
})
kinds := []sdkres.Kind{folders.FolderKind()}
for _, kind := range kinds {
gvk := gv.WithKind(kind.Kind())
err := scheme.AddFieldLabelConversionFunc(
gvk,
func(label, value string) (string, string, error) {
if label == "metadata.name" || label == "metadata.namespace" {
return label, value, nil
}
fields := kind.SelectableFields()
for _, field := range fields {
if field.FieldSelector == label {
return label, value, nil
}
}
return "", "", fmt.Errorf("field label not supported for %s: %s", gvk, label)
},
)
if err != nil {
return err
}
}
// If multiple versions exist, then register conversions from zz_generated.conversion.go
// if err := playlist.RegisterConversions(scheme); err != nil {
// return err
@@ -137,6 +163,26 @@ func (b *FolderAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
return scheme.SetVersionPriority(gv)
}
// TODO: work with all kinds from schema, not just one.
func (b *FolderAPIBuilder) BuildGetAttrsFn(k sdkres.Kind) func(obj runtime.Object) (labels.Set, fields.Set, error) {
return func(obj runtime.Object) (labels.Set, fields.Set, error) {
if robj, ok := obj.(sdkres.Object); !ok {
return nil, nil, fmt.Errorf("not a resource.Object")
} else {
fieldsSet := fields.Set{}
for _, f := range k.SelectableFields() {
v, err := f.FieldValueFunc(robj)
if err != nil {
return nil, nil, err
}
fieldsSet[f.FieldSelector] = v
}
return robj.GetLabels(), fieldsSet, nil
}
}
}
func (b *FolderAPIBuilder) AllowedV0Alpha1Resources() []string {
return nil
}
@@ -148,10 +194,11 @@ func (b *FolderAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver.API
Permissions: b.setDefaultFolderPermissions,
})
unified, err := grafanaregistry.NewRegistryStore(opts.Scheme, resourceInfo, opts.OptsGetter)
unified, err := grafanaregistry.NewRegistryStore(opts.Scheme, resourceInfo, opts.OptsGetter, grafanaregistry.WithAttrFunc(b.BuildGetAttrsFn(folders.FolderKind())))
if err != nil {
return err
}
b.registerPermissionHooks(unified)
b.storage = unified

View File

@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
secrets,
originalStorageConfig,
nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
nil, // secrets
originalStorageConfig,
nil,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage(

View File

@@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
serverOptions.OverridesService = overridesSvc
}
server, err := sql.NewResourceServer(serverOptions)
server, searchServer, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, searchServer), nil
}
}

View File

@@ -31,11 +31,14 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type SearchClient interface {
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
}
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,

View File

@@ -3,9 +3,11 @@ package resource
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"github.com/grafana/grafana-app-sdk/app"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -101,6 +103,9 @@ type IndexableDocument struct {
// metadata, annotations, or external data linked at index time
Fields map[string]any `json:"fields,omitempty"`
// Automatically indexed selectable fields, used for field-based filtering when listing.
SelectableFields map[string]string `json:"selectable_fields,omitempty"`
// Maintain a list of resource references.
// Someday this will likely be part of https://github.com/grafana/gamma
References ResourceReferences `json:"references,omitempty"`
@@ -175,7 +180,7 @@ func (m ResourceReferences) Less(i, j int) bool {
}
// Create a new indexable document based on a generic k8s resource
func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.GrafanaMetaAccessor) *IndexableDocument {
func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.GrafanaMetaAccessor, selectableFields map[string]string) *IndexableDocument {
title := obj.FindTitle(key.Name)
if title == key.Name {
// TODO: something wrong with FindTitle
@@ -199,6 +204,7 @@ func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.Grafa
Folder: obj.GetFolder(),
CreatedBy: obj.GetCreatedBy(),
UpdatedBy: obj.GetUpdatedBy(),
SelectableFields: selectableFields,
}
m, ok := obj.GetManagerProperties()
if ok {
@@ -220,11 +226,14 @@ func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.Grafa
return doc.UpdateCopyFields()
}
func StandardDocumentBuilder() DocumentBuilder {
return &standardDocumentBuilder{}
func StandardDocumentBuilder(manifests []app.Manifest) DocumentBuilder {
return &standardDocumentBuilder{selectableFields: SelectableFieldsForManifests(manifests)}
}
type standardDocumentBuilder struct{}
type standardDocumentBuilder struct {
// Maps "group/resource" (in lowercase) to list of selectable fields.
selectableFields map[string][]string
}
func (s *standardDocumentBuilder) BuildDocument(ctx context.Context, key *resourcepb.ResourceKey, rv int64, value []byte) (*IndexableDocument, error) {
tmp := &unstructured.Unstructured{}
@@ -238,10 +247,36 @@ func (s *standardDocumentBuilder) BuildDocument(ctx context.Context, key *resour
return nil, err
}
doc := NewIndexableDocument(key, rv, obj)
sfKey := strings.ToLower(key.GetGroup() + "/" + key.GetResource())
selectableFields := buildSelectableFields(tmp, s.selectableFields[sfKey])
doc := NewIndexableDocument(key, rv, obj, selectableFields)
return doc, nil
}
func buildSelectableFields(tmp *unstructured.Unstructured, fields []string) map[string]string {
result := map[string]string{}
for _, field := range fields {
path := strings.Split(field, ".")
val, ok, err := unstructured.NestedFieldNoCopy(tmp.Object, path...)
if err != nil || !ok {
continue
}
switch v := val.(type) {
case string:
result[field] = v
case bool:
result[field] = strconv.FormatBool(v)
case int, float64:
result[field] = fmt.Sprintf("%v", v)
}
}
return result
}
type searchableDocumentFields struct {
names []string
fields map[string]*resourceTableColumn

View File

@@ -14,7 +14,7 @@ import (
func TestStandardDocumentBuilder(t *testing.T) {
ctx := context.Background()
builder := StandardDocumentBuilder()
builder := StandardDocumentBuilder(nil)
body, err := os.ReadFile("testdata/playlist-resource.json")
require.NoError(t, err)

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource
}
// This supports indexing+search regardless of implementation
var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct {
log log.Logger
storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
)
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support
if opts.Backend == nil {
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil
}
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
type RingClient struct {
Client ResourceClient
Client SearchClient
grpc_health_v1.HealthClient
Conn *grpc.ClientConn
}
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil {
return nil, err
}
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r)
}
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace))
if err != nil {

View File

@@ -0,0 +1,45 @@
package resource
import (
"strings"
"github.com/grafana/grafana-app-sdk/app"
folder "github.com/grafana/grafana/apps/folder/pkg/apis/manifestdata"
iam "github.com/grafana/grafana/apps/iam/pkg/apis"
)
func AppManifests() []app.Manifest {
return []app.Manifest{
iam.LocalManifest(),
folder.LocalManifest(),
}
}
func SelectableFields() map[string][]string {
return SelectableFieldsForManifests(AppManifests())
}
// SelectableFieldsForManifests returns map of <group/kind> to list of selectable fields.
// Also <group/plural> is included as a key, pointing to the same fields.
func SelectableFieldsForManifests(manifests []app.Manifest) map[string][]string {
fields := map[string][]string{}
for _, m := range manifests {
group := m.ManifestData.Group
for _, version := range m.ManifestData.Versions {
for _, kind := range version.Kinds {
key := strings.ToLower(group + "/" + kind.Kind)
keyPlural := strings.ToLower(group + "/" + kind.Plural)
if len(kind.SelectableFields) > 0 {
fields[key] = kind.SelectableFields
fields[keyPlural] = kind.SelectableFields
}
}
}
}
return fields
}

View File

@@ -12,6 +12,8 @@ import (
"github.com/Masterminds/semver"
"github.com/google/uuid"
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@@ -20,9 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/apimachinery/validation"
"github.com/grafana/grafana/pkg/infra/log"
@@ -33,12 +32,17 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
}
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer
resourcepb.QuotasServer
@@ -221,7 +225,8 @@ type ResourceServerOptions struct {
Blob BlobConfig
// Search options
Search SearchOptions
SearchOptions SearchOptions // TODO: needed?
Search SearchServer
// Quota service
OverridesService *OverridesService
@@ -250,16 +255,12 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
}
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -342,16 +343,17 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
search: opts.Search,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
}
/*
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
@@ -359,6 +361,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
return nil, err
}
}
*/
err := s.Init(ctx)
if err != nil {
@@ -376,7 +379,7 @@ type server struct {
backend StorageBackend
blob BlobSupport
secure secrets.InlineSecureValueSupport
search *searchSupport
search SearchServer
diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient
writeHooks WriteAccessHooks
@@ -423,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx)
}
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
@@ -452,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true
@@ -1043,6 +1037,68 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
return rsp, nil
}
// Remove metadata.namespace filter from requirement fields, if it's present.
for ix := 0; ix < len(req.Options.Fields); {
v := req.Options.Fields[ix]
if v.Key == "metadata.namespace" && v.Operator == "=" {
if len(v.Values) == 1 && v.Values[0] == req.Options.Key.Namespace {
// Remove this requirement from fields, as it's implied by the key.namespace.
req.Options.Fields = append(req.Options.Fields[:ix], req.Options.Fields[ix+1:]...)
// Don't increment ix, as we're removing an element from the slice.
continue
}
}
ix++
}
// TODO: What to do about RV and version_match fields?
// If we get here, we're doing list with selectable fields. Let's do search instead, since
// we index all selectable fields, and fetch resulting documents one by one.
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if req.Options.Key.Namespace == "" {
return &resourcepb.ListResponse{
Error: NewBadRequestError("namespace must be specified for list with filter"),
}, nil
}
srq := &resourcepb.ResourceSearchRequest{
Options: req.Options,
//Federated: nil,
Limit: req.Limit,
// Offset: req.NextPageToken, // TODO
// Page: 0,
// Permission: 0, // Not needed, default is List
}
searchResp, err := s.search.Search(ctx, srq)
if err != nil {
return nil, err
}
rsp := &resourcepb.ListResponse{}
// Using searchResp.GetResults().GetRows() will not panic if anything is nil on the path.
for _, row := range searchResp.GetResults().GetRows() {
// TODO: use batch reading
val, err := s.Read(ctx, &resourcepb.ReadRequest{
Key: row.Key,
ResourceVersion: row.ResourceVersion,
})
if err != nil {
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
}
if len(val.Value) > 0 {
rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
Value: val.Value,
ResourceVersion: val.ResourceVersion,
})
if val.ResourceVersion > rsp.ResourceVersion {
rsp.ResourceVersion = val.ResourceVersion
}
}
}
return rsp, nil
}
if req.Limit < 1 {
req.Limit = 500 // default max 500 items in a page
}
@@ -1371,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
}
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req)

View File

@@ -59,7 +59,6 @@ type kvStorageBackend struct {
dataStore *dataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
withPruner bool
eventRetentionPeriod time.Duration
@@ -109,7 +108,6 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
eventStore: eventStore,
notifier: newNotifier(eventStore, notifierOptions{}),
snowflake: s,
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval,

View File

@@ -93,6 +93,8 @@ type bleveBackend struct {
indexMetrics *resource.BleveIndexMetrics
selectableFields map[string][]string
bgTasksCancel func()
bgTasksWg sync.WaitGroup
}
@@ -140,6 +142,7 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
opts: opts,
ownsIndexFn: ownFn,
indexMetrics: indexMetrics,
selectableFields: resource.SelectableFields(),
}
ctx, cancel := context.WithCancel(context.Background())
@@ -366,7 +369,9 @@ func (b *bleveBackend) BuildIndex(
attribute.String("reason", indexBuildReason),
)
mapper, err := GetBleveMappings(fields)
selectableFields := b.selectableFields[fmt.Sprintf("%s/%s", key.Group, key.Resource)]
mapper, err := GetBleveMappings(fields, selectableFields)
if err != nil {
return nil, err
}
@@ -459,7 +464,7 @@ func (b *bleveBackend) BuildIndex(
}
// Batch all the changes
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, selectableFields, updater, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
if build {
if b.indexMetrics != nil {
@@ -701,6 +706,7 @@ type bleveIndex struct {
standard resource.SearchableDocumentFields
fields resource.SearchableDocumentFields
selectableFields []string
indexStorage string // memory or file, used when updating metrics
@@ -736,6 +742,7 @@ func (b *bleveBackend) newBleveIndex(
fields resource.SearchableDocumentFields,
allFields []*resourcepb.ResourceTableColumnDefinition,
standardSearchFields resource.SearchableDocumentFields,
selectableFields []string,
updaterFn resource.UpdateFn,
logger log.Logger,
) *bleveIndex {
@@ -745,6 +752,7 @@ func (b *bleveBackend) newBleveIndex(
indexStorage: newIndexType,
fields: fields,
allFields: allFields,
selectableFields: selectableFields,
standard: standardSearchFields,
logger: logger,
updaterFn: updaterFn,
@@ -1215,7 +1223,11 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
// filters
if len(req.Options.Fields) > 0 {
for _, v := range req.Options.Fields {
q, err := requirementQuery(v, "")
prefix := ""
if b.isSelectableField(v.Key) {
prefix = "selectable_fields."
}
q, err := requirementQuery(v, prefix)
if err != nil {
return nil, err
}
@@ -1787,6 +1799,15 @@ func (b *bleveIndex) hitsToTable(ctx context.Context, selectFields []string, hit
return table, nil
}
func (b *bleveIndex) isSelectableField(key string) bool {
for _, f := range b.selectableFields {
if key == f {
return true
}
}
return false
}
func getAllFields(standard resource.SearchableDocumentFields, custom resource.SearchableDocumentFields) ([]*resourcepb.ResourceTableColumnDefinition, error) {
fields := []*resourcepb.ResourceTableColumnDefinition{
standard.Field(resource.SEARCH_FIELD_ID),

View File

@@ -10,19 +10,19 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
func GetBleveMappings(fields resource.SearchableDocumentFields) (mapping.IndexMapping, error) {
func GetBleveMappings(fields resource.SearchableDocumentFields, selectableFields []string) (mapping.IndexMapping, error) {
mapper := bleve.NewIndexMapping()
err := RegisterCustomAnalyzers(mapper)
if err != nil {
return nil, err
}
mapper.DefaultMapping = getBleveDocMappings(fields)
mapper.DefaultMapping = getBleveDocMappings(fields, selectableFields)
return mapper, nil
}
func getBleveDocMappings(fields resource.SearchableDocumentFields) *mapping.DocumentMapping {
func getBleveDocMappings(fields resource.SearchableDocumentFields, selectableFields []string) *mapping.DocumentMapping {
mapper := bleve.NewDocumentStaticMapping()
nameMapping := &mapping.FieldMapping{
@@ -165,5 +165,73 @@ func getBleveDocMappings(fields resource.SearchableDocumentFields) *mapping.Docu
mapper.AddSubDocumentMapping("fields", fieldMapper)
selectableFieldsMapper := bleve.NewDocumentStaticMapping()
for _, field := range selectableFields {
selectableFieldsMapper.AddFieldMappingsAt(field, &mapping.FieldMapping{
Name: field,
Type: "text",
Analyzer: keyword.Name,
Store: false,
Index: true,
})
}
mapper.AddSubDocumentMapping("selectable_fields", selectableFieldsMapper)
return mapper
}
/*
Here's a tree representation of the field mappings in pkg/storage/unified/search/bleve_mappings.go:
Document Root (DefaultMapping)
├── name [text, keyword analyzer]
├── title_phrase [keyword, not stored]
├── title [3 mappings]
│ ├── [1] standard analyzer, stored
│ ├── [2] TITLE_ANALYZER (edge ngram), not stored
│ └── [3] keyword, not stored
├── description [text, stored]
├── tags [text, keyword analyzer, stored, includeInAll]
├── folder [text, keyword analyzer, stored, includeInAll, docValues]
├── managedBy [text, keyword analyzer, not stored]
├── source/ [sub-document]
│ ├── path [text, keyword analyzer, stored]
│ ├── checksum [text, keyword analyzer, stored]
│ └── timestampMillis [numeric]
├── manager/ [sub-document]
│ ├── kind [text, keyword analyzer, stored, includeInAll]
│ └── id [text, keyword analyzer, stored, includeInAll]
├── reference/ [sub-document, default analyzer: keyword]
│ └── (dynamic fields inherit keyword analyzer)
├── labels/ [sub-document]
│ └── (dynamic fields)
└── fields/ [sub-document]
└── (conditional mappings)
├── {filterable string fields} [keyword, stored]
└── {other fields} [dynamically mapped by Bleve]
Key observations:
- Root level has standard searchable fields (name, title, description, tags, folder)
- title has 3 analyzers applied: standard (for word search), edge ngram (for prefix search), and keyword (for phrase sorting)
- source/, manager/: Static sub-documents with explicitly mapped fields
- reference/: Dynamic sub-document with keyword default analyzer (line 142)
- labels/, fields/: Dynamic sub-documents where Bleve auto-detects field types at index time
References:
- Main mapping function: pkg/storage/unified/search/bleve_mappings.go:25-169
- Sub-document mappings: lines 88-143
- Dynamic fields handling: lines 148-166
*/

View File

@@ -13,7 +13,7 @@ import (
)
func TestDocumentMapping(t *testing.T) {
mappings, err := search.GetBleveMappings(nil)
mappings, err := search.GetBleveMappings(nil, nil)
require.NoError(t, err)
data := resource.IndexableDocument{
Title: "title",

View File

@@ -264,7 +264,7 @@ func (s *DashboardDocumentBuilder) BuildDocument(ctx context.Context, key *resou
summary.UID = obj.GetName()
summary.ID = obj.GetDeprecatedInternalID() // nolint:staticcheck
doc := resource.NewIndexableDocument(key, rv, obj)
doc := resource.NewIndexableDocument(key, rv, obj, nil)
doc.Title = summary.Title
doc.Description = summary.Description
doc.Tags = summary.Tags

View File

@@ -115,7 +115,7 @@ func TestDashboardDocumentBuilder(t *testing.T) {
"aaa",
})
builder = resource.StandardDocumentBuilder()
builder = resource.StandardDocumentBuilder(nil)
doSnapshotTests(t, builder, "folder", &resourcepb.ResourceKey{
Namespace: "default",
Group: "folder.grafana.app",

View File

@@ -66,7 +66,7 @@ func (u *extGroupMappingDocumentBuilder) BuildDocument(ctx context.Context, key
return nil, err
}
doc := resource.NewIndexableDocument(key, rv, obj)
doc := resource.NewIndexableDocument(key, rv, obj, nil)
doc.Fields = make(map[string]any)
if extGroupMapping.Spec.TeamRef.Name != "" {

View File

@@ -70,7 +70,7 @@ func (t *teamSearchBuilder) BuildDocument(ctx context.Context, key *resourcepb.R
return nil, err
}
doc := resource.NewIndexableDocument(key, rv, obj)
doc := resource.NewIndexableDocument(key, rv, obj, nil)
doc.Fields = make(map[string]any)
if team.Spec.Email != "" {

View File

@@ -66,7 +66,7 @@ func (u *userDocumentBuilder) BuildDocument(ctx context.Context, key *resourcepb
return nil, err
}
doc := resource.NewIndexableDocument(key, rv, obj)
doc := resource.NewIndexableDocument(key, rv, obj, nil)
doc.Fields = make(map[string]any)
if user.Spec.Email != "" {

View File

@@ -25,7 +25,7 @@ func (s *StandardDocumentBuilders) GetDocumentBuilders() ([]resource.DocumentBui
result := []resource.DocumentBuilderInfo{
{
Builder: resource.StandardDocumentBuilder(),
Builder: resource.StandardDocumentBuilder(resource.AppManifests()),
},
}
return append(result, all...), nil

View File

@@ -46,7 +46,7 @@ type ServerOptions struct {
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -57,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
nil, // not needed for gRPC client mode
)
if err != nil {
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
}
opts.SecureValues = inlineSecureValueService
}
@@ -77,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Blob.URL = "file:///" + dir
}
@@ -94,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
return nil, nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
@@ -108,20 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
})
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Backend = backend
serverOptions.Diagnostics = backend
serverOptions.Lifecycle = backend
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
if err := search.Init(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
serverOptions.Search = search
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions)
rs, err := resource.NewResourceServer(serverOptions)
if err != nil {
_ = search.Stop(context.Background())
}
return rs, nil, err
}
// isHighAvailabilityEnabled determines if high availability mode should

View File

@@ -291,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
serverOptions.OverridesService = overridesSvc
}
server, err := NewResourceServer(serverOptions)
server, searchServer, err := NewResourceServer(serverOptions)
if err != nil {
return err
}
@@ -308,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server)
resourcepb.RegisterBulkStoreServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, server)
resourcepb.RegisterManagedObjectIndexServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
resourcepb.RegisterBlobStoreServer(srv, server)
resourcepb.RegisterDiagnosticsServer(srv, server)
resourcepb.RegisterQuotasServer(srv, server)