mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 12:04:45 +08:00
Compare commits
2 Commits
docs/add-t
...
selectable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7293f6fb5 | ||
|
|
584287dc61 |
@@ -8,6 +8,12 @@ foldersV1beta1: {
|
||||
spec: {
|
||||
title: string
|
||||
description?: string
|
||||
foo: bool
|
||||
bar: int
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
selectableFields: [
|
||||
"spec.title",
|
||||
]
|
||||
}
|
||||
|
||||
@@ -5,13 +5,26 @@
|
||||
package v1beta1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/grafana/grafana-app-sdk/resource"
|
||||
)
|
||||
|
||||
// schema is unexported to prevent accidental overwrites
|
||||
var (
|
||||
schemaFolder = resource.NewSimpleSchema("folder.grafana.app", "v1beta1", NewFolder(), &FolderList{}, resource.WithKind("Folder"),
|
||||
resource.WithPlural("folders"), resource.WithScope(resource.NamespacedScope))
|
||||
resource.WithPlural("folders"), resource.WithScope(resource.NamespacedScope), resource.WithSelectableFields([]resource.SelectableField{resource.SelectableField{
|
||||
FieldSelector: "spec.title",
|
||||
FieldValueFunc: func(o resource.Object) (string, error) {
|
||||
cast, ok := o.(*Folder)
|
||||
if !ok {
|
||||
return "", errors.New("provided object must be of type *Folder")
|
||||
}
|
||||
|
||||
return cast.Spec.Title, nil
|
||||
},
|
||||
},
|
||||
}))
|
||||
kindFolder = resource.Kind{
|
||||
Schema: schemaFolder,
|
||||
Codecs: map[resource.KindEncoding]resource.Codec{
|
||||
|
||||
@@ -18,6 +18,8 @@ import (
|
||||
v1beta1 "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
|
||||
)
|
||||
|
||||
var ()
|
||||
|
||||
var appManifestData = app.ManifestData{
|
||||
AppName: "folder",
|
||||
Group: "folder.grafana.app",
|
||||
@@ -32,6 +34,9 @@ var appManifestData = app.ManifestData{
|
||||
Plural: "Folders",
|
||||
Scope: "Namespaced",
|
||||
Conversion: false,
|
||||
SelectableFields: []string{
|
||||
"spec.title",
|
||||
},
|
||||
},
|
||||
},
|
||||
Routes: app.ManifestVersionRoutes{
|
||||
|
||||
@@ -14,7 +14,7 @@ userKind: {
|
||||
}
|
||||
|
||||
userv0alpha1: userKind & {
|
||||
// TODO: Uncomment this when User will be added to ManagedKinds
|
||||
// TODO: Uncomment this when User will be added to ManagedKinds
|
||||
// validation: {
|
||||
// operations: [
|
||||
// "CREATE",
|
||||
|
||||
4
apps/iam/pkg/apis/iam_manifest.go
generated
4
apps/iam/pkg/apis/iam_manifest.go
generated
@@ -74,6 +74,10 @@ var appManifestData = app.ManifestData{
|
||||
Plural: "Users",
|
||||
Scope: "Namespaced",
|
||||
Conversion: false,
|
||||
SelectableFields: []string{
|
||||
"spec.email",
|
||||
"spec.login",
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
|
||||
@@ -4,11 +4,24 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, optsGetter generic.RESTOptionsGetter) (*registry.Store, error) {
|
||||
type registryStoreOptions struct {
|
||||
attrFunc storage.AttrFunc
|
||||
}
|
||||
|
||||
type OptionFn func(*registryStoreOptions)
|
||||
|
||||
func WithAttrFunc(attrFunc storage.AttrFunc) OptionFn {
|
||||
return func(opts *registryStoreOptions) {
|
||||
opts.attrFunc = attrFunc
|
||||
}
|
||||
}
|
||||
|
||||
func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, optsGetter generic.RESTOptionsGetter, options ...OptionFn) (*registry.Store, error) {
|
||||
gv := resourceInfo.GroupVersion()
|
||||
gv.Version = runtime.APIVersionInternal
|
||||
strategy := NewStrategy(scheme, gv)
|
||||
@@ -20,7 +33,7 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
|
||||
NewListFunc: resourceInfo.NewListFunc,
|
||||
KeyRootFunc: KeyRootFunc(resourceInfo.GroupResource()),
|
||||
KeyFunc: NamespaceKeyFunc(resourceInfo.GroupResource()),
|
||||
PredicateFunc: Matcher,
|
||||
//PredicateFunc: Matcher,
|
||||
DefaultQualifiedResource: resourceInfo.GroupResource(),
|
||||
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
|
||||
TableConvertor: resourceInfo.TableConverter(),
|
||||
@@ -28,8 +41,16 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
|
||||
UpdateStrategy: strategy,
|
||||
DeleteStrategy: strategy,
|
||||
}
|
||||
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
|
||||
if err := store.CompleteWithOptions(options); err != nil {
|
||||
|
||||
opts := ®istryStoreOptions{
|
||||
attrFunc: GetAttrs,
|
||||
}
|
||||
for _, opt := range options {
|
||||
opt(opts)
|
||||
}
|
||||
|
||||
o := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: opts.attrFunc}
|
||||
if err := store.CompleteWithOptions(o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return store, nil
|
||||
|
||||
@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
|
||||
return d.server.GetBlob(ctx, in)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceClient.
|
||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return d.server.GetStats(ctx, in)
|
||||
}
|
||||
//// GetStats implements ResourceClient.
|
||||
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
// return d.server.GetStats(ctx, in)
|
||||
//}
|
||||
|
||||
// IsHealthy implements ResourceClient.
|
||||
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
|
||||
return d.server.List(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return d.server.ListManagedObjects(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return d.server.CountManagedObjects(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
// return d.server.ListManagedObjects(ctx, in)
|
||||
//}
|
||||
//
|
||||
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
// return d.server.CountManagedObjects(ctx, in)
|
||||
//}
|
||||
|
||||
// PutBlob implements ResourceClient.
|
||||
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
|
||||
}
|
||||
|
||||
// Search implements ResourceClient.
|
||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return d.server.Search(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
// return d.server.Search(ctx, in)
|
||||
//}
|
||||
|
||||
// Update implements ResourceClient.
|
||||
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
@@ -22,6 +24,8 @@ import (
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
"github.com/grafana/grafana-app-sdk/logging"
|
||||
|
||||
sdkres "github.com/grafana/grafana-app-sdk/resource"
|
||||
|
||||
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
|
||||
"github.com/grafana/grafana/apps/iam/pkg/reconcilers"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
@@ -37,7 +41,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
@@ -74,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
acService accesscontrol.Service,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
unified resourcepb.ResourceIndexClient,
|
||||
zanzanaClient zanzana.Client,
|
||||
) *FolderAPIBuilder {
|
||||
builder := &FolderAPIBuilder{
|
||||
@@ -93,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
return builder
|
||||
}
|
||||
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
return &FolderAPIBuilder{
|
||||
features: features,
|
||||
accessClient: ac,
|
||||
@@ -129,6 +132,29 @@ func (b *FolderAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
|
||||
Version: runtime.APIVersionInternal,
|
||||
})
|
||||
|
||||
kinds := []sdkres.Kind{folders.FolderKind()}
|
||||
for _, kind := range kinds {
|
||||
gvk := gv.WithKind(kind.Kind())
|
||||
err := scheme.AddFieldLabelConversionFunc(
|
||||
gvk,
|
||||
func(label, value string) (string, string, error) {
|
||||
if label == "metadata.name" || label == "metadata.namespace" {
|
||||
return label, value, nil
|
||||
}
|
||||
fields := kind.SelectableFields()
|
||||
for _, field := range fields {
|
||||
if field.FieldSelector == label {
|
||||
return label, value, nil
|
||||
}
|
||||
}
|
||||
return "", "", fmt.Errorf("field label not supported for %s: %s", gvk, label)
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If multiple versions exist, then register conversions from zz_generated.conversion.go
|
||||
// if err := playlist.RegisterConversions(scheme); err != nil {
|
||||
// return err
|
||||
@@ -137,6 +163,26 @@ func (b *FolderAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
|
||||
return scheme.SetVersionPriority(gv)
|
||||
}
|
||||
|
||||
// TODO: work with all kinds from schema, not just one.
|
||||
func (b *FolderAPIBuilder) BuildGetAttrsFn(k sdkres.Kind) func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
return func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
if robj, ok := obj.(sdkres.Object); !ok {
|
||||
return nil, nil, fmt.Errorf("not a resource.Object")
|
||||
} else {
|
||||
fieldsSet := fields.Set{}
|
||||
|
||||
for _, f := range k.SelectableFields() {
|
||||
v, err := f.FieldValueFunc(robj)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
fieldsSet[f.FieldSelector] = v
|
||||
}
|
||||
return robj.GetLabels(), fieldsSet, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *FolderAPIBuilder) AllowedV0Alpha1Resources() []string {
|
||||
return nil
|
||||
}
|
||||
@@ -148,10 +194,11 @@ func (b *FolderAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver.API
|
||||
Permissions: b.setDefaultFolderPermissions,
|
||||
})
|
||||
|
||||
unified, err := grafanaregistry.NewRegistryStore(opts.Scheme, resourceInfo, opts.OptsGetter)
|
||||
unified, err := grafanaregistry.NewRegistryStore(opts.Scheme, resourceInfo, opts.OptsGetter, grafanaregistry.WithAttrFunc(b.BuildGetAttrsFn(folders.FolderKind())))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.registerPermissionHooks(unified)
|
||||
b.storage = unified
|
||||
|
||||
|
||||
@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||
|
||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||
|
||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
secrets,
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
nil, // secrets
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
|
||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
default:
|
||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||
}
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
|
||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||
store, destroyFunc, err := apistore.NewStorage(
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, nil), nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
server, searchServer, err := sql.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, searchServer), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,11 +31,14 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
type SearchClient interface {
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
}
|
||||
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
resourcepb.BulkStoreClient
|
||||
resourcepb.BlobStoreClient
|
||||
resourcepb.DiagnosticsClient
|
||||
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceStore_ServiceDesc,
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
&resourcepb.BlobStore_ServiceDesc,
|
||||
&resourcepb.BulkStore_ServiceDesc,
|
||||
&resourcepb.Diagnostics_ServiceDesc,
|
||||
|
||||
@@ -3,9 +3,11 @@ package resource
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/grafana-app-sdk/app"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
@@ -101,6 +103,9 @@ type IndexableDocument struct {
|
||||
// metadata, annotations, or external data linked at index time
|
||||
Fields map[string]any `json:"fields,omitempty"`
|
||||
|
||||
// Automatically indexed selectable fields, used for field-based filtering when listing.
|
||||
SelectableFields map[string]string `json:"selectable_fields,omitempty"`
|
||||
|
||||
// Maintain a list of resource references.
|
||||
// Someday this will likely be part of https://github.com/grafana/gamma
|
||||
References ResourceReferences `json:"references,omitempty"`
|
||||
@@ -175,7 +180,7 @@ func (m ResourceReferences) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
// Create a new indexable document based on a generic k8s resource
|
||||
func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.GrafanaMetaAccessor) *IndexableDocument {
|
||||
func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.GrafanaMetaAccessor, selectableFields map[string]string) *IndexableDocument {
|
||||
title := obj.FindTitle(key.Name)
|
||||
if title == key.Name {
|
||||
// TODO: something wrong with FindTitle
|
||||
@@ -191,14 +196,15 @@ func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.Grafa
|
||||
}
|
||||
}
|
||||
doc := &IndexableDocument{
|
||||
Key: key,
|
||||
RV: rv,
|
||||
Name: key.Name,
|
||||
Title: title, // We always want *something* to display
|
||||
Labels: obj.GetLabels(),
|
||||
Folder: obj.GetFolder(),
|
||||
CreatedBy: obj.GetCreatedBy(),
|
||||
UpdatedBy: obj.GetUpdatedBy(),
|
||||
Key: key,
|
||||
RV: rv,
|
||||
Name: key.Name,
|
||||
Title: title, // We always want *something* to display
|
||||
Labels: obj.GetLabels(),
|
||||
Folder: obj.GetFolder(),
|
||||
CreatedBy: obj.GetCreatedBy(),
|
||||
UpdatedBy: obj.GetUpdatedBy(),
|
||||
SelectableFields: selectableFields,
|
||||
}
|
||||
m, ok := obj.GetManagerProperties()
|
||||
if ok {
|
||||
@@ -220,11 +226,14 @@ func NewIndexableDocument(key *resourcepb.ResourceKey, rv int64, obj utils.Grafa
|
||||
return doc.UpdateCopyFields()
|
||||
}
|
||||
|
||||
func StandardDocumentBuilder() DocumentBuilder {
|
||||
return &standardDocumentBuilder{}
|
||||
func StandardDocumentBuilder(manifests []app.Manifest) DocumentBuilder {
|
||||
return &standardDocumentBuilder{selectableFields: SelectableFieldsForManifests(manifests)}
|
||||
}
|
||||
|
||||
type standardDocumentBuilder struct{}
|
||||
type standardDocumentBuilder struct {
|
||||
// Maps "group/resource" (in lowercase) to list of selectable fields.
|
||||
selectableFields map[string][]string
|
||||
}
|
||||
|
||||
func (s *standardDocumentBuilder) BuildDocument(ctx context.Context, key *resourcepb.ResourceKey, rv int64, value []byte) (*IndexableDocument, error) {
|
||||
tmp := &unstructured.Unstructured{}
|
||||
@@ -238,10 +247,36 @@ func (s *standardDocumentBuilder) BuildDocument(ctx context.Context, key *resour
|
||||
return nil, err
|
||||
}
|
||||
|
||||
doc := NewIndexableDocument(key, rv, obj)
|
||||
sfKey := strings.ToLower(key.GetGroup() + "/" + key.GetResource())
|
||||
selectableFields := buildSelectableFields(tmp, s.selectableFields[sfKey])
|
||||
|
||||
doc := NewIndexableDocument(key, rv, obj, selectableFields)
|
||||
return doc, nil
|
||||
}
|
||||
|
||||
func buildSelectableFields(tmp *unstructured.Unstructured, fields []string) map[string]string {
|
||||
result := map[string]string{}
|
||||
|
||||
for _, field := range fields {
|
||||
path := strings.Split(field, ".")
|
||||
val, ok, err := unstructured.NestedFieldNoCopy(tmp.Object, path...)
|
||||
if err != nil || !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
result[field] = v
|
||||
case bool:
|
||||
result[field] = strconv.FormatBool(v)
|
||||
case int, float64:
|
||||
result[field] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
type searchableDocumentFields struct {
|
||||
names []string
|
||||
fields map[string]*resourceTableColumn
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
|
||||
func TestStandardDocumentBuilder(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
builder := StandardDocumentBuilder()
|
||||
builder := StandardDocumentBuilder(nil)
|
||||
|
||||
body, err := os.ReadFile("testdata/playlist-resource.json")
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
||||
GetOpenIndexes() []NamespacedResource
|
||||
}
|
||||
|
||||
// This supports indexing+search regardless of implementation
|
||||
var _ SearchServer = &searchSupport{}
|
||||
|
||||
// This supports indexing+search regardless of implementation.
|
||||
// Implements SearchServer interface.
|
||||
type searchSupport struct {
|
||||
log log.Logger
|
||||
storage StorageBackend
|
||||
@@ -160,6 +163,10 @@ var (
|
||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||
)
|
||||
|
||||
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||
}
|
||||
|
||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||
// No backend search support
|
||||
if opts.Backend == nil {
|
||||
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
||||
return totalBatchesIndexed, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) Init(ctx context.Context) error {
|
||||
return s.init(ctx)
|
||||
}
|
||||
|
||||
func (s *searchSupport) Stop(_ context.Context) error {
|
||||
s.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) init(ctx context.Context) error {
|
||||
origCtx := ctx
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
type RingClient struct {
|
||||
Client ResourceClient
|
||||
Client SearchClient
|
||||
grpc_health_v1.HealthClient
|
||||
Conn *grpc.ClientConn
|
||||
}
|
||||
@@ -99,7 +99,7 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
||||
return client.ListManagedObjects(ctx, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(namespace))
|
||||
if err != nil {
|
||||
|
||||
45
pkg/storage/unified/resource/selectable_fields.go
Normal file
45
pkg/storage/unified/resource/selectable_fields.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-app-sdk/app"
|
||||
|
||||
folder "github.com/grafana/grafana/apps/folder/pkg/apis/manifestdata"
|
||||
iam "github.com/grafana/grafana/apps/iam/pkg/apis"
|
||||
)
|
||||
|
||||
func AppManifests() []app.Manifest {
|
||||
return []app.Manifest{
|
||||
iam.LocalManifest(),
|
||||
folder.LocalManifest(),
|
||||
}
|
||||
}
|
||||
|
||||
func SelectableFields() map[string][]string {
|
||||
return SelectableFieldsForManifests(AppManifests())
|
||||
}
|
||||
|
||||
// SelectableFieldsForManifests returns map of <group/kind> to list of selectable fields.
|
||||
// Also <group/plural> is included as a key, pointing to the same fields.
|
||||
func SelectableFieldsForManifests(manifests []app.Manifest) map[string][]string {
|
||||
fields := map[string][]string{}
|
||||
|
||||
for _, m := range manifests {
|
||||
group := m.ManifestData.Group
|
||||
|
||||
for _, version := range m.ManifestData.Versions {
|
||||
for _, kind := range version.Kinds {
|
||||
key := strings.ToLower(group + "/" + kind.Kind)
|
||||
keyPlural := strings.ToLower(group + "/" + kind.Plural)
|
||||
|
||||
if len(kind.SelectableFields) > 0 {
|
||||
fields[key] = kind.SelectableFields
|
||||
fields[keyPlural] = kind.SelectableFields
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fields
|
||||
}
|
||||
@@ -12,6 +12,8 @@ import (
|
||||
|
||||
"github.com/Masterminds/semver"
|
||||
"github.com/google/uuid"
|
||||
claims "github.com/grafana/authlib/types"
|
||||
"github.com/grafana/dskit/backoff"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@@ -20,9 +22,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
claims "github.com/grafana/authlib/types"
|
||||
"github.com/grafana/dskit/backoff"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/validation"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@@ -33,12 +32,17 @@ import (
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
type SearchServer interface {
|
||||
LifecycleHooks
|
||||
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
}
|
||||
|
||||
// ResourceServer implements all gRPC services
|
||||
type ResourceServer interface {
|
||||
resourcepb.ResourceStoreServer
|
||||
resourcepb.BulkStoreServer
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.BlobStoreServer
|
||||
resourcepb.DiagnosticsServer
|
||||
resourcepb.QuotasServer
|
||||
@@ -221,7 +225,8 @@ type ResourceServerOptions struct {
|
||||
Blob BlobConfig
|
||||
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
SearchOptions SearchOptions // TODO: needed?
|
||||
Search SearchServer
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
@@ -250,16 +255,12 @@ type ResourceServerOptions struct {
|
||||
|
||||
storageMetrics *StorageMetrics
|
||||
|
||||
IndexMetrics *BleveIndexMetrics
|
||||
|
||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||
MaxPageSizeBytes int
|
||||
|
||||
// QOSQueue is the quality of service queue used to enqueue
|
||||
QOSQueue QOSEnqueuer
|
||||
QOSConfig QueueConfig
|
||||
|
||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
@@ -342,23 +343,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
storageMetrics: opts.storageMetrics,
|
||||
indexMetrics: opts.IndexMetrics,
|
||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||
reg: opts.Reg,
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
search: opts.Search,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
|
||||
}
|
||||
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
/*
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
err := s.Init(ctx)
|
||||
if err != nil {
|
||||
@@ -376,7 +379,7 @@ type server struct {
|
||||
backend StorageBackend
|
||||
blob BlobSupport
|
||||
secure secrets.InlineSecureValueSupport
|
||||
search *searchSupport
|
||||
search SearchServer
|
||||
diagnostics resourcepb.DiagnosticsServer
|
||||
access claims.AccessClient
|
||||
writeHooks WriteAccessHooks
|
||||
@@ -423,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
|
||||
s.initErr = s.overridesService.init(ctx)
|
||||
}
|
||||
|
||||
// initialize the search index
|
||||
if s.initErr == nil && s.search != nil {
|
||||
s.initErr = s.search.init(ctx)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil {
|
||||
s.initErr = s.initWatcher()
|
||||
@@ -452,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.search != nil {
|
||||
s.search.stop()
|
||||
}
|
||||
|
||||
if s.overridesService != nil {
|
||||
if err := s.overridesService.stop(ctx); err != nil {
|
||||
stopFailed = true
|
||||
@@ -1043,6 +1037,68 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
// Remove metadata.namespace filter from requirement fields, if it's present.
|
||||
for ix := 0; ix < len(req.Options.Fields); {
|
||||
v := req.Options.Fields[ix]
|
||||
if v.Key == "metadata.namespace" && v.Operator == "=" {
|
||||
if len(v.Values) == 1 && v.Values[0] == req.Options.Key.Namespace {
|
||||
// Remove this requirement from fields, as it's implied by the key.namespace.
|
||||
req.Options.Fields = append(req.Options.Fields[:ix], req.Options.Fields[ix+1:]...)
|
||||
// Don't increment ix, as we're removing an element from the slice.
|
||||
continue
|
||||
}
|
||||
}
|
||||
ix++
|
||||
}
|
||||
|
||||
// TODO: What to do about RV and version_match fields?
|
||||
// If we get here, we're doing list with selectable fields. Let's do search instead, since
|
||||
// we index all selectable fields, and fetch resulting documents one by one.
|
||||
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||
if req.Options.Key.Namespace == "" {
|
||||
return &resourcepb.ListResponse{
|
||||
Error: NewBadRequestError("namespace must be specified for list with filter"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
srq := &resourcepb.ResourceSearchRequest{
|
||||
Options: req.Options,
|
||||
//Federated: nil,
|
||||
Limit: req.Limit,
|
||||
// Offset: req.NextPageToken, // TODO
|
||||
// Page: 0,
|
||||
// Permission: 0, // Not needed, default is List
|
||||
}
|
||||
|
||||
searchResp, err := s.search.Search(ctx, srq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp := &resourcepb.ListResponse{}
|
||||
// Using searchResp.GetResults().GetRows() will not panic if anything is nil on the path.
|
||||
for _, row := range searchResp.GetResults().GetRows() {
|
||||
// TODO: use batch reading
|
||||
val, err := s.Read(ctx, &resourcepb.ReadRequest{
|
||||
Key: row.Key,
|
||||
ResourceVersion: row.ResourceVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||||
}
|
||||
if len(val.Value) > 0 {
|
||||
rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{
|
||||
Value: val.Value,
|
||||
ResourceVersion: val.ResourceVersion,
|
||||
})
|
||||
if val.ResourceVersion > rsp.ResourceVersion {
|
||||
rsp.ResourceVersion = val.ResourceVersion
|
||||
}
|
||||
}
|
||||
}
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
if req.Limit < 1 {
|
||||
req.Limit = 500 // default max 500 items in a page
|
||||
}
|
||||
@@ -1371,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceServer.
|
||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
if err := s.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.search == nil {
|
||||
// If the backend implements "GetStats", we can use it
|
||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||||
if ok {
|
||||
return srv.GetStats(ctx, req)
|
||||
}
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
return s.search.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.ListManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return s.diagnostics.IsHealthy(ctx, req)
|
||||
|
||||
@@ -59,7 +59,6 @@ type kvStorageBackend struct {
|
||||
dataStore *dataStore
|
||||
eventStore *eventStore
|
||||
notifier *notifier
|
||||
builder DocumentBuilder
|
||||
log logging.Logger
|
||||
withPruner bool
|
||||
eventRetentionPeriod time.Duration
|
||||
@@ -109,8 +108,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
||||
eventStore: eventStore,
|
||||
notifier: newNotifier(eventStore, notifierOptions{}),
|
||||
snowflake: s,
|
||||
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
|
||||
log: &logging.NoOpLogger{}, // Make this configurable
|
||||
log: &logging.NoOpLogger{}, // Make this configurable
|
||||
eventRetentionPeriod: eventRetentionPeriod,
|
||||
eventPruningInterval: eventPruningInterval,
|
||||
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
|
||||
|
||||
@@ -93,6 +93,8 @@ type bleveBackend struct {
|
||||
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
|
||||
selectableFields map[string][]string
|
||||
|
||||
bgTasksCancel func()
|
||||
bgTasksWg sync.WaitGroup
|
||||
}
|
||||
@@ -135,11 +137,12 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
|
||||
}
|
||||
|
||||
be := &bleveBackend{
|
||||
log: l,
|
||||
cache: map[resource.NamespacedResource]*bleveIndex{},
|
||||
opts: opts,
|
||||
ownsIndexFn: ownFn,
|
||||
indexMetrics: indexMetrics,
|
||||
log: l,
|
||||
cache: map[resource.NamespacedResource]*bleveIndex{},
|
||||
opts: opts,
|
||||
ownsIndexFn: ownFn,
|
||||
indexMetrics: indexMetrics,
|
||||
selectableFields: resource.SelectableFields(),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -366,7 +369,9 @@ func (b *bleveBackend) BuildIndex(
|
||||
attribute.String("reason", indexBuildReason),
|
||||
)
|
||||
|
||||
mapper, err := GetBleveMappings(fields)
|
||||
selectableFields := b.selectableFields[fmt.Sprintf("%s/%s", key.Group, key.Resource)]
|
||||
|
||||
mapper, err := GetBleveMappings(fields, selectableFields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -459,7 +464,7 @@ func (b *bleveBackend) BuildIndex(
|
||||
}
|
||||
|
||||
// Batch all the changes
|
||||
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, updater, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
|
||||
idx := b.newBleveIndex(key, index, newIndexType, fields, allFields, standardSearchFields, selectableFields, updater, b.log.New("namespace", key.Namespace, "group", key.Group, "resource", key.Resource))
|
||||
|
||||
if build {
|
||||
if b.indexMetrics != nil {
|
||||
@@ -699,8 +704,9 @@ type bleveIndex struct {
|
||||
// Subsequent update requests only trigger new update if minUpdateInterval has elapsed.
|
||||
nextUpdateTime time.Time
|
||||
|
||||
standard resource.SearchableDocumentFields
|
||||
fields resource.SearchableDocumentFields
|
||||
standard resource.SearchableDocumentFields
|
||||
fields resource.SearchableDocumentFields
|
||||
selectableFields []string
|
||||
|
||||
indexStorage string // memory or file, used when updating metrics
|
||||
|
||||
@@ -736,6 +742,7 @@ func (b *bleveBackend) newBleveIndex(
|
||||
fields resource.SearchableDocumentFields,
|
||||
allFields []*resourcepb.ResourceTableColumnDefinition,
|
||||
standardSearchFields resource.SearchableDocumentFields,
|
||||
selectableFields []string,
|
||||
updaterFn resource.UpdateFn,
|
||||
logger log.Logger,
|
||||
) *bleveIndex {
|
||||
@@ -745,6 +752,7 @@ func (b *bleveBackend) newBleveIndex(
|
||||
indexStorage: newIndexType,
|
||||
fields: fields,
|
||||
allFields: allFields,
|
||||
selectableFields: selectableFields,
|
||||
standard: standardSearchFields,
|
||||
logger: logger,
|
||||
updaterFn: updaterFn,
|
||||
@@ -1215,7 +1223,11 @@ func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.R
|
||||
// filters
|
||||
if len(req.Options.Fields) > 0 {
|
||||
for _, v := range req.Options.Fields {
|
||||
q, err := requirementQuery(v, "")
|
||||
prefix := ""
|
||||
if b.isSelectableField(v.Key) {
|
||||
prefix = "selectable_fields."
|
||||
}
|
||||
q, err := requirementQuery(v, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1787,6 +1799,15 @@ func (b *bleveIndex) hitsToTable(ctx context.Context, selectFields []string, hit
|
||||
return table, nil
|
||||
}
|
||||
|
||||
func (b *bleveIndex) isSelectableField(key string) bool {
|
||||
for _, f := range b.selectableFields {
|
||||
if key == f {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getAllFields(standard resource.SearchableDocumentFields, custom resource.SearchableDocumentFields) ([]*resourcepb.ResourceTableColumnDefinition, error) {
|
||||
fields := []*resourcepb.ResourceTableColumnDefinition{
|
||||
standard.Field(resource.SEARCH_FIELD_ID),
|
||||
|
||||
@@ -10,19 +10,19 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
func GetBleveMappings(fields resource.SearchableDocumentFields) (mapping.IndexMapping, error) {
|
||||
func GetBleveMappings(fields resource.SearchableDocumentFields, selectableFields []string) (mapping.IndexMapping, error) {
|
||||
mapper := bleve.NewIndexMapping()
|
||||
|
||||
err := RegisterCustomAnalyzers(mapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mapper.DefaultMapping = getBleveDocMappings(fields)
|
||||
mapper.DefaultMapping = getBleveDocMappings(fields, selectableFields)
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func getBleveDocMappings(fields resource.SearchableDocumentFields) *mapping.DocumentMapping {
|
||||
func getBleveDocMappings(fields resource.SearchableDocumentFields, selectableFields []string) *mapping.DocumentMapping {
|
||||
mapper := bleve.NewDocumentStaticMapping()
|
||||
|
||||
nameMapping := &mapping.FieldMapping{
|
||||
@@ -165,5 +165,73 @@ func getBleveDocMappings(fields resource.SearchableDocumentFields) *mapping.Docu
|
||||
|
||||
mapper.AddSubDocumentMapping("fields", fieldMapper)
|
||||
|
||||
selectableFieldsMapper := bleve.NewDocumentStaticMapping()
|
||||
for _, field := range selectableFields {
|
||||
selectableFieldsMapper.AddFieldMappingsAt(field, &mapping.FieldMapping{
|
||||
Name: field,
|
||||
Type: "text",
|
||||
Analyzer: keyword.Name,
|
||||
Store: false,
|
||||
Index: true,
|
||||
})
|
||||
}
|
||||
mapper.AddSubDocumentMapping("selectable_fields", selectableFieldsMapper)
|
||||
|
||||
return mapper
|
||||
}
|
||||
|
||||
/*
|
||||
Here's a tree representation of the field mappings in pkg/storage/unified/search/bleve_mappings.go:
|
||||
|
||||
Document Root (DefaultMapping)
|
||||
│
|
||||
├── name [text, keyword analyzer]
|
||||
│
|
||||
├── title_phrase [keyword, not stored]
|
||||
│
|
||||
├── title [3 mappings]
|
||||
│ ├── [1] standard analyzer, stored
|
||||
│ ├── [2] TITLE_ANALYZER (edge ngram), not stored
|
||||
│ └── [3] keyword, not stored
|
||||
│
|
||||
├── description [text, stored]
|
||||
│
|
||||
├── tags [text, keyword analyzer, stored, includeInAll]
|
||||
│
|
||||
├── folder [text, keyword analyzer, stored, includeInAll, docValues]
|
||||
│
|
||||
├── managedBy [text, keyword analyzer, not stored]
|
||||
│
|
||||
├── source/ [sub-document]
|
||||
│ ├── path [text, keyword analyzer, stored]
|
||||
│ ├── checksum [text, keyword analyzer, stored]
|
||||
│ └── timestampMillis [numeric]
|
||||
│
|
||||
├── manager/ [sub-document]
|
||||
│ ├── kind [text, keyword analyzer, stored, includeInAll]
|
||||
│ └── id [text, keyword analyzer, stored, includeInAll]
|
||||
│
|
||||
├── reference/ [sub-document, default analyzer: keyword]
|
||||
│ └── (dynamic fields inherit keyword analyzer)
|
||||
│
|
||||
├── labels/ [sub-document]
|
||||
│ └── (dynamic fields)
|
||||
│
|
||||
└── fields/ [sub-document]
|
||||
└── (conditional mappings)
|
||||
├── {filterable string fields} [keyword, stored]
|
||||
└── {other fields} [dynamically mapped by Bleve]
|
||||
|
||||
Key observations:
|
||||
|
||||
- Root level has standard searchable fields (name, title, description, tags, folder)
|
||||
- title has 3 analyzers applied: standard (for word search), edge ngram (for prefix search), and keyword (for phrase sorting)
|
||||
- source/, manager/: Static sub-documents with explicitly mapped fields
|
||||
- reference/: Dynamic sub-document with keyword default analyzer (line 142)
|
||||
- labels/, fields/: Dynamic sub-documents where Bleve auto-detects field types at index time
|
||||
|
||||
References:
|
||||
- Main mapping function: pkg/storage/unified/search/bleve_mappings.go:25-169
|
||||
- Sub-document mappings: lines 88-143
|
||||
- Dynamic fields handling: lines 148-166
|
||||
*/
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func TestDocumentMapping(t *testing.T) {
|
||||
mappings, err := search.GetBleveMappings(nil)
|
||||
mappings, err := search.GetBleveMappings(nil, nil)
|
||||
require.NoError(t, err)
|
||||
data := resource.IndexableDocument{
|
||||
Title: "title",
|
||||
|
||||
@@ -264,7 +264,7 @@ func (s *DashboardDocumentBuilder) BuildDocument(ctx context.Context, key *resou
|
||||
summary.UID = obj.GetName()
|
||||
summary.ID = obj.GetDeprecatedInternalID() // nolint:staticcheck
|
||||
|
||||
doc := resource.NewIndexableDocument(key, rv, obj)
|
||||
doc := resource.NewIndexableDocument(key, rv, obj, nil)
|
||||
doc.Title = summary.Title
|
||||
doc.Description = summary.Description
|
||||
doc.Tags = summary.Tags
|
||||
|
||||
@@ -115,7 +115,7 @@ func TestDashboardDocumentBuilder(t *testing.T) {
|
||||
"aaa",
|
||||
})
|
||||
|
||||
builder = resource.StandardDocumentBuilder()
|
||||
builder = resource.StandardDocumentBuilder(nil)
|
||||
doSnapshotTests(t, builder, "folder", &resourcepb.ResourceKey{
|
||||
Namespace: "default",
|
||||
Group: "folder.grafana.app",
|
||||
|
||||
@@ -66,7 +66,7 @@ func (u *extGroupMappingDocumentBuilder) BuildDocument(ctx context.Context, key
|
||||
return nil, err
|
||||
}
|
||||
|
||||
doc := resource.NewIndexableDocument(key, rv, obj)
|
||||
doc := resource.NewIndexableDocument(key, rv, obj, nil)
|
||||
|
||||
doc.Fields = make(map[string]any)
|
||||
if extGroupMapping.Spec.TeamRef.Name != "" {
|
||||
|
||||
@@ -70,7 +70,7 @@ func (t *teamSearchBuilder) BuildDocument(ctx context.Context, key *resourcepb.R
|
||||
return nil, err
|
||||
}
|
||||
|
||||
doc := resource.NewIndexableDocument(key, rv, obj)
|
||||
doc := resource.NewIndexableDocument(key, rv, obj, nil)
|
||||
|
||||
doc.Fields = make(map[string]any)
|
||||
if team.Spec.Email != "" {
|
||||
|
||||
@@ -66,7 +66,7 @@ func (u *userDocumentBuilder) BuildDocument(ctx context.Context, key *resourcepb
|
||||
return nil, err
|
||||
}
|
||||
|
||||
doc := resource.NewIndexableDocument(key, rv, obj)
|
||||
doc := resource.NewIndexableDocument(key, rv, obj, nil)
|
||||
|
||||
doc.Fields = make(map[string]any)
|
||||
if user.Spec.Email != "" {
|
||||
|
||||
@@ -25,7 +25,7 @@ func (s *StandardDocumentBuilders) GetDocumentBuilders() ([]resource.DocumentBui
|
||||
|
||||
result := []resource.DocumentBuilderInfo{
|
||||
{
|
||||
Builder: resource.StandardDocumentBuilder(),
|
||||
Builder: resource.StandardDocumentBuilder(resource.AppManifests()),
|
||||
},
|
||||
}
|
||||
return append(result, all...), nil
|
||||
|
||||
@@ -46,7 +46,7 @@ type ServerOptions struct {
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -57,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
nil, // not needed for gRPC client mode
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
}
|
||||
opts.SecureValues = inlineSecureValueService
|
||||
}
|
||||
@@ -77,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
|
||||
err := os.MkdirAll(dir, 0700)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Blob.URL = "file:///" + dir
|
||||
}
|
||||
@@ -94,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||
@@ -108,20 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Backend = backend
|
||||
serverOptions.Diagnostics = backend
|
||||
serverOptions.Lifecycle = backend
|
||||
}
|
||||
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
if err := search.Init(context.Background()); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
serverOptions.Search = search
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
serverOptions.OverridesService = opts.OverridesService
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
rs, err := resource.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
_ = search.Stop(context.Background())
|
||||
}
|
||||
return rs, nil, err
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
|
||||
@@ -291,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
server, searchServer, err := NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -308,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
||||
resourcepb.RegisterQuotasServer(srv, server)
|
||||
|
||||
Reference in New Issue
Block a user