Compare commits

...

3 Commits

Author SHA1 Message Date
konsalex
3e12e47704 chore: IAM work with Gabriel
Some checks failed
CodeQL checks / Detect whether code changed (push) Has been cancelled
CodeQL checks / Analyze (actions) (push) Has been cancelled
CodeQL checks / Analyze (go) (push) Has been cancelled
CodeQL checks / Analyze (javascript) (push) Has been cancelled
2025-11-25 11:15:26 +01:00
konsalex
939170c7d5 use FF and cleanup access code
Some checks failed
CodeQL checks / Detect whether code changed (push) Has been cancelled
CodeQL checks / Analyze (actions) (push) Has been cancelled
CodeQL checks / Analyze (go) (push) Has been cancelled
CodeQL checks / Analyze (javascript) (push) Has been cancelled
2025-11-24 16:26:05 +01:00
konsalex
c0af936bcd wip: apiextensions 2025-11-24 11:46:16 +01:00
25 changed files with 2798 additions and 25 deletions

46
conf/apiextensions.ini Normal file
View File

@@ -0,0 +1,46 @@
; Run locally unified storage with SQLite to test
; new API registration changes
app_mode = development
target = all
[log]
level = debug
[server]
; HTTPS is required for kubectl (but HTTP works for testing with curl)
protocol = https
http_port = 1111
[feature_toggles]
; Enable the apiextensions feature
apiExtensions = true
; Enable unified storage globally
unifiedStorage = true
; Enable search indexing for unified storage
unifiedStorageSearch = true
; Enable the grafana-apiserver explicitly
grafanaAPIServer = true
[grafana-apiserver]
; Use unified storage backed by SQL (uses your Grafana database)
storage_type = unified
; Configure dashboards to use unified storage
[unified_storage.dashboards.dashboard.grafana.app]
dualWriterMode = 5
; Configure folders to use unified storage (required for dashboards)
[unified_storage.folders.folder.grafana.app]
dualWriterMode = 5
[database]
; SQLite database for testing
type = sqlite3
path = grafana.db
high_availability = false
; Will only be used for the MT grafana
; apiextensions service
; [auth.extended_jwt]
; enabled = true
; jwks_url = "http://localhost:6481/jwks"

4
go.mod
View File

@@ -645,7 +645,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/src-d/go-errors.v1 v1.0.0 // indirect
gopkg.in/telebot.v3 v3.3.8 // indirect
k8s.io/apiextensions-apiserver v0.34.2 // indirect
k8s.io/apiextensions-apiserver v0.34.2
k8s.io/kms v0.34.2 // indirect
modernc.org/libc v1.66.10 // indirect
modernc.org/mathutil v1.7.1 // indirect
@@ -657,6 +657,8 @@ require (
require github.com/grafana/tempo v1.5.1-0.20250529124718-87c2dc380cec // @grafana/observability-traces-and-profiling
require github.com/evanphx/json-patch v5.9.11+incompatible
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/containerd/log v0.1.0 // indirect

View File

@@ -159,6 +159,7 @@ var serviceIdentityTokenPermissions = []string{
"query.grafana.app:*",
"iam.grafana.app:*",
"preferences.grafana.app:*",
"apiextensions.grafana.app:*",
// Secrets Manager uses a custom verb for secret decryption, and its authorizer does not allow wildcard permissions.
"secret.grafana.app/securevalues:decrypt",

View File

@@ -131,19 +131,31 @@ func NamespaceKeyFunc(gr schema.GroupResource) func(ctx context.Context, name st
}
}
// NoNamespaceKeyFunc is the default function for constructing storage paths
// to a resource relative to the given prefix without a namespace.
func NoNamespaceKeyFunc(ctx context.Context, prefix string, gr schema.GroupResource, name string) (string, error) {
if len(name) == 0 {
return "", apierrors.NewBadRequest("Name parameter required.")
// ClusterScopedKeyFunc constructs storage paths for cluster-scoped resources (no namespace).
func ClusterScopedKeyFunc(gr schema.GroupResource) func(ctx context.Context, name string) (string, error) {
return func(ctx context.Context, name string) (string, error) {
if len(name) == 0 {
return "", apierrors.NewBadRequest("Name parameter required.")
}
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
}
key := &Key{
Group: gr.Group,
Resource: gr.Resource,
Name: name,
}
return key.String(), nil
}
}
// ClusterScopedKeyRootFunc is used by the generic registry store for cluster-scoped resources.
func ClusterScopedKeyRootFunc(gr schema.GroupResource) func(ctx context.Context) string {
return func(ctx context.Context) string {
key := &Key{
Group: gr.Group,
Resource: gr.Resource,
}
return key.String()
}
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
}
key := &Key{
Group: gr.Group,
Resource: gr.Resource,
Name: name,
}
return prefix + key.String(), nil
}

View File

@@ -1,6 +1,8 @@
package generic
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
@@ -12,16 +14,27 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
gv := resourceInfo.GroupVersion()
gv.Version = runtime.APIVersionInternal
strategy := NewStrategy(scheme, gv)
gr := resourceInfo.GroupResource()
var keyRootFunc func(ctx context.Context) string
var keyFunc func(ctx context.Context, name string) (string, error)
if resourceInfo.IsClusterScoped() {
strategy = strategy.WithClusterScope()
keyRootFunc = ClusterScopedKeyRootFunc(gr)
keyFunc = ClusterScopedKeyFunc(gr)
} else {
keyRootFunc = KeyRootFunc(gr)
keyFunc = NamespaceKeyFunc(gr)
}
store := &registry.Store{
NewFunc: resourceInfo.NewFunc,
NewListFunc: resourceInfo.NewListFunc,
KeyRootFunc: KeyRootFunc(resourceInfo.GroupResource()),
KeyFunc: NamespaceKeyFunc(resourceInfo.GroupResource()),
KeyRootFunc: keyRootFunc,
KeyFunc: keyFunc,
PredicateFunc: Matcher,
DefaultQualifiedResource: resourceInfo.GroupResource(),
DefaultQualifiedResource: gr,
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
TableConvertor: resourceInfo.TableConverter(),
CreateStrategy: strategy,

View File

@@ -15,7 +15,6 @@ import (
_ "github.com/blugelabs/bluge"
_ "github.com/blugelabs/bluge_segment_api"
_ "github.com/crewjam/saml"
_ "github.com/docker/go-connections/nat"
_ "github.com/go-jose/go-jose/v4"
_ "github.com/gobwas/glob"
_ "github.com/googleapis/gax-go/v2"
@@ -31,7 +30,6 @@ import (
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
_ "github.com/spyzhov/ajson"
_ "github.com/stretchr/testify/require"
_ "github.com/testcontainers/testcontainers-go"
_ "gocloud.dev/secrets/awskms"
_ "gocloud.dev/secrets/azurekeyvault"
_ "gocloud.dev/secrets/gcpkms"
@@ -56,7 +54,9 @@ import (
_ "github.com/grafana/e2e"
_ "github.com/grafana/gofpdf"
_ "github.com/grafana/gomemcache/memcache"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
_ "github.com/grafana/tempo/pkg/traceql"
_ "github.com/testcontainers/testcontainers-go"
)

View File

@@ -0,0 +1,113 @@
# Grafana CRD Support - POC Testing Guide
This directory contains example CRD definitions and custom resources for testing the Kubernetes CustomResourceDefinition (CRD) support in Grafana server.
## How to run
To run just compile Grafana with `make build-go` and then run the service with the `apiextensions.ini` provided under the `conf` folder.
```bash
./bin/darwin-arm64/grafana server --config conf/apiextensions.ini
```
To enable this feature we use the `apiExtensions = true` flag and also have unified storage as our storage backend.
There is no need for US service to run in ST Grafana, and the database will be a SQLite one.
## Testing Steps
### Step 1: Create a CustomResourceDefinition
Create the example CRD that defines a "Widget" resource:
```bash
kubectl apply -f ./pkg/registry/apis/apiextensions/resources/example-crd.yaml
```
Or use curl after you set a Grafana Service account token:
```bash
export AUTH_SVC="Authorization: Bearer glsa_<rest of the token>"
```
```bash
# From Grafana root
curl -k -X POST https://localhost:1111/apis/apiextensions.k8s.io/v1/customresourcedefinitions \
-H "$AUTH_SVC" \
-H "Content-Type: application/yaml" \
--data-binary @$PWD/pkg/registry/apis/apiextensions/resources/example-crd.yaml
```
### Step 2: Verify the CRD was created
List all CRDs:
```bash
# Be sure to use the generated kube-config file
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
kubectl get customresourcedefinitions.apiextensions.k8s.io
# Or with curl:
curl -k -X GET https://localhost:1111/apis/apiextensions.k8s.io/v1/customresourcedefinitions \
-H "$AUTH_SVC"
```
### Step 3: Create a Custom Resource Instance
Now that the CRD is registered, create an instance of the Widget resource:
```bash
kubectl apply -f ./pkg/registry/apis/apiextensions/resources/example-widget.yaml
```
Or use curl:
```bash
curl -k -X POST https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets \
-H "$AUTH_SVC" \
-H "Content-Type: application/yaml" \
--data-binary @$PWD/pkg/registry/apis/apiextensions/resources/example-widget.yaml
```
### Step 4: Verify the Custom Resource
> Note: You can see all the resources in SQLite database called `grafana.db`
List all widgets:
```bash
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
kubectl get widgets -n default
# Or with curl:
curl -k -X GET https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets \
-H "$AUTH_SVC" | jq .
```
### Step 5: Update the Custom Resource
Update the widget's spec:
```bash
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
kubectl edit widget my-widget -n default
# Or with curl (PATCH):
curl -X PATCH https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets/my-widget \
-H "Content-Type: application/merge-patch+json" \
-H "$AUTH_SVC" \
-d '{"spec":{"replicas":5}}'
```
## What is left
- [ ] Support multiple versions of CRDs (example in `discovery.go`)
- [ ] Watch new CRDs, so we do not require server restart `dynamic_registry.go`. This is needed for horizontal deployments.
- [ ] Support `/status` subresource
- [ ] Add tracer and logger and remove `fmt.Print`
- [ ] Implement MT setup
- [ ] Figure out how to modify storage checks for Cluster scoped resources (when we create a new CRD)
- [ ] How to tackle Cluster scoped CRs
- [ ] Use the feature flag to start the `apiextensions` service on-demand

View File

@@ -0,0 +1,167 @@
package apiextensions
import (
"encoding/json"
"fmt"
"net/http"
"sync"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// DiscoveryManager manages discovery for dynamically registered custom resources
type DiscoveryManager struct {
mu sync.RWMutex
apiGroups map[string]*metav1.APIGroup // group name -> APIGroup
resources map[string]*metav1.APIResourceList // group/version -> APIResourceList
}
// NewDiscoveryManager creates a new discovery manager
func NewDiscoveryManager() *DiscoveryManager {
return &DiscoveryManager{
apiGroups: make(map[string]*metav1.APIGroup),
resources: make(map[string]*metav1.APIResourceList),
}
}
// AddCustomResource adds a custom resource to the discovery documents
func (d *DiscoveryManager) AddCustomResource(crd *apiextensionsv1.CustomResourceDefinition) {
d.mu.Lock()
defer d.mu.Unlock()
group := crd.Spec.Group
// TODO(@konsalex): is this needed to be iterated
// or multiple versions will be different crd entries?
// Need to test this out
version := crd.Spec.Versions[0].Name
gvKey := fmt.Sprintf("%s/%s", group, version)
// Update API Group discovery
apiGroup, ok := d.apiGroups[group]
if !ok {
apiGroup = &metav1.APIGroup{
TypeMeta: metav1.TypeMeta{
Kind: "APIGroup",
APIVersion: "v1",
},
Name: group,
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: fmt.Sprintf("%s/%s", group, version),
Version: version,
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: fmt.Sprintf("%s/%s", group, version),
Version: version,
},
}
d.apiGroups[group] = apiGroup
}
// Update API Resource List
resourceList, ok := d.resources[gvKey]
if !ok {
resourceList = &metav1.APIResourceList{
TypeMeta: metav1.TypeMeta{
Kind: "APIResourceList",
APIVersion: "v1",
},
GroupVersion: fmt.Sprintf("%s/%s", group, version),
APIResources: []metav1.APIResource{},
}
d.resources[gvKey] = resourceList
}
// Add the resource to the list
apiResource := metav1.APIResource{
Name: crd.Spec.Names.Plural,
SingularName: crd.Spec.Names.Singular,
Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
Kind: crd.Spec.Names.Kind,
// TODO(@konsalex): Can this be dynamic ever?
// Need to validate
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
ShortNames: crd.Spec.Names.ShortNames,
}
// Add status subresource if defined
if crd.Spec.Versions[0].Subresources != nil && crd.Spec.Versions[0].Subresources.Status != nil {
apiResource.Verbs = append(apiResource.Verbs, "update")
}
// Check if resource already exists
found := false
for i, r := range resourceList.APIResources {
if r.Name == apiResource.Name {
resourceList.APIResources[i] = apiResource
found = true
break
}
}
if !found {
resourceList.APIResources = append(resourceList.APIResources, apiResource)
}
}
// GetAPIGroupList returns the list of API groups for /apis discovery
func (d *DiscoveryManager) GetAPIGroupList() *metav1.APIGroupList {
d.mu.RLock()
defer d.mu.RUnlock()
groups := make([]metav1.APIGroup, 0, len(d.apiGroups))
for _, group := range d.apiGroups {
groups = append(groups, *group)
}
return &metav1.APIGroupList{
TypeMeta: metav1.TypeMeta{
Kind: "APIGroupList",
APIVersion: "v1",
},
Groups: groups,
}
}
// GetAPIGroup returns a specific API group
func (d *DiscoveryManager) GetAPIGroup(name string) *metav1.APIGroup {
d.mu.RLock()
defer d.mu.RUnlock()
return d.apiGroups[name]
}
// GetAPIResourceList returns the resource list for a specific group/version
func (d *DiscoveryManager) GetAPIResourceList(gv schema.GroupVersion) *metav1.APIResourceList {
d.mu.RLock()
defer d.mu.RUnlock()
key := fmt.Sprintf("%s/%s", gv.Group, gv.Version)
return d.resources[key]
}
// ServeAPIGroup handles requests to /apis/<group>
func (d *DiscoveryManager) ServeAPIGroup(w http.ResponseWriter, req *http.Request, group string) {
apiGroup := d.GetAPIGroup(group)
if apiGroup == nil {
http.NotFound(w, req)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(apiGroup)
}
// ServeAPIResourceList handles requests to /apis/<group>/<version>
func (d *DiscoveryManager) ServeAPIResourceList(w http.ResponseWriter, req *http.Request, gv schema.GroupVersion) {
resourceList := d.GetAPIResourceList(gv)
if resourceList == nil {
http.NotFound(w, req)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resourceList)
}

View File

@@ -0,0 +1,451 @@
package apiextensions
import (
"context"
"fmt"
"io"
"mime"
"net/http"
"strings"
"sync"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
)
// DynamicCRHandler handles HTTP requests for dynamically registered custom resources
type DynamicCRHandler struct {
mu sync.RWMutex
// Map of group -> version -> resource -> storage
storageMap map[string]map[string]map[string]rest.Storage
// Map of group -> version -> resource -> scope (Namespaced or Cluster)
scopeMap map[string]map[string]map[string]apiextensionsv1.ResourceScope
scheme *runtime.Scheme
codecs serializer.CodecFactory
}
// NewDynamicCRHandler creates a new dynamic custom resource handler
func NewDynamicCRHandler(scheme *runtime.Scheme) *DynamicCRHandler {
return &DynamicCRHandler{
storageMap: make(map[string]map[string]map[string]rest.Storage),
scheme: scheme,
codecs: serializer.NewCodecFactory(scheme),
}
}
type registeredCR struct {
storage rest.Storage
scope apiextensionsv1.ResourceScope
}
// RegisterCustomResource registers a custom resource storage for dynamic routing
func (h *DynamicCRHandler) RegisterCustomResource(
crd *apiextensionsv1.CustomResourceDefinition,
version string,
storage rest.Storage,
) {
h.mu.Lock()
defer h.mu.Unlock()
group := crd.Spec.Group
resource := crd.Spec.Names.Plural
if h.storageMap[group] == nil {
h.storageMap[group] = make(map[string]map[string]rest.Storage)
}
if h.storageMap[group][version] == nil {
h.storageMap[group][version] = make(map[string]rest.Storage)
}
h.storageMap[group][version][resource] = storage
// Store the scope information
if h.scopeMap == nil {
h.scopeMap = make(map[string]map[string]map[string]apiextensionsv1.ResourceScope)
}
if h.scopeMap[group] == nil {
h.scopeMap[group] = make(map[string]map[string]apiextensionsv1.ResourceScope)
}
if h.scopeMap[group][version] == nil {
h.scopeMap[group][version] = make(map[string]apiextensionsv1.ResourceScope)
}
h.scopeMap[group][version][resource] = crd.Spec.Scope
fmt.Printf("DynamicCRHandler: Registered %s/%s/%s (scope: %s)\n", group, version, resource, crd.Spec.Scope)
}
// ServeHTTP handles HTTP requests for custom resources
func (h *DynamicCRHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// We could use a custom regex pattern, but RequestInfo is
// already available and gives us the match info for free.
requestInfo, ok := request.RequestInfoFrom(req.Context())
if !ok || requestInfo == nil {
http.NotFound(w, req)
return
}
group := requestInfo.APIGroup
version := requestInfo.APIVersion
namespace := requestInfo.Namespace
resource := requestInfo.Resource
name := requestInfo.Name
fmt.Printf("DynamicCRHandler: %s %s (group=%s, version=%s, resource=%s, namespace=%s, name=%s)\n",
req.Method, req.URL.Path, group, version, resource, namespace, name)
// Look up the storage
h.mu.RLock()
versionMap, groupExists := h.storageMap[group]
if !groupExists {
h.mu.RUnlock()
http.NotFound(w, req)
return
}
resourceMap, versionExists := versionMap[version]
if !versionExists {
h.mu.RUnlock()
http.NotFound(w, req)
return
}
storage, resourceExists := resourceMap[resource]
if !resourceExists {
h.mu.RUnlock()
http.NotFound(w, req)
return
}
// Check scope
scope := h.scopeMap[group][version][resource]
h.mu.RUnlock()
// Validate scope matches the request
if scope == apiextensionsv1.NamespaceScoped && namespace == "" {
http.Error(w, fmt.Sprintf("resource %s is namespace-scoped, must specify namespace", resource), http.StatusBadRequest)
return
}
if scope == apiextensionsv1.ClusterScoped && namespace != "" {
http.Error(w, fmt.Sprintf("resource %s is cluster-scoped, cannot specify namespace", resource), http.StatusBadRequest)
return
}
// Add request info to context
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
IsResourceRequest: true,
Path: req.URL.Path,
Verb: strings.ToLower(req.Method),
APIGroup: group,
APIVersion: version,
Namespace: namespace,
Resource: resource,
Name: name,
})
// Handle the request based on the storage interface
h.handleStorageRequest(ctx, w, req, storage, name, namespace)
}
// handleStorageRequest dispatches to the appropriate storage method
func (h *DynamicCRHandler) handleStorageRequest(
ctx context.Context,
w http.ResponseWriter,
req *http.Request,
storage rest.Storage,
name string,
namespace string,
) {
// Determine media type for response
_, serializerInfo, err := negotiation.NegotiateOutputMediaType(req, h.codecs, negotiation.DefaultEndpointRestrictions)
if err != nil {
http.Error(w, fmt.Sprintf("failed to negotiate media type: %v", err), http.StatusNotAcceptable)
return
}
serializer := serializerInfo.Serializer
switch req.Method {
case http.MethodGet:
if name == "" {
// List operation
if lister, ok := storage.(rest.Lister); ok {
h.handleList(ctx, w, req, lister, serializer)
} else {
http.Error(w, "list not supported", http.StatusMethodNotAllowed)
}
} else {
// Get operation
if getter, ok := storage.(rest.Getter); ok {
h.handleGet(ctx, w, req, getter, name, serializer)
} else {
http.Error(w, "get not supported", http.StatusMethodNotAllowed)
}
}
case http.MethodPost:
// Create operation
if creater, ok := storage.(rest.Creater); ok {
h.handleCreate(ctx, w, req, creater, serializer)
} else {
http.Error(w, "create not supported", http.StatusMethodNotAllowed)
}
case http.MethodPut:
// Update operation
if updater, ok := storage.(rest.Updater); ok {
h.handleUpdate(ctx, w, req, updater, name, serializer)
} else {
http.Error(w, "update not supported", http.StatusMethodNotAllowed)
}
case http.MethodPatch:
// Patch operation
if patcher, ok := storage.(rest.Patcher); ok {
h.handlePatch(ctx, w, req, patcher, name, serializer)
} else {
http.Error(w, "patch not supported", http.StatusMethodNotAllowed)
}
case http.MethodDelete:
// Delete operation
if deleter, ok := storage.(rest.GracefulDeleter); ok {
h.handleDelete(ctx, w, req, deleter, name, serializer)
} else {
http.Error(w, "delete not supported", http.StatusMethodNotAllowed)
}
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
// Helper methods for each operation
func (h *DynamicCRHandler) handleList(ctx context.Context, w http.ResponseWriter, req *http.Request, lister rest.Lister, serializer runtime.Serializer) {
// TODO(@konsalex): Parse list options from query parameters
obj, err := lister.List(ctx, nil)
if err != nil {
http.Error(w, fmt.Sprintf("failed to list: %v", err), http.StatusInternalServerError)
return
}
h.writeResponse(w, obj, serializer)
}
func (h *DynamicCRHandler) handleGet(ctx context.Context, w http.ResponseWriter, req *http.Request, getter rest.Getter, name string, serializer runtime.Serializer) {
// TODO(@konsalex): Parse get options
obj, err := getter.Get(ctx, name, nil)
if err != nil {
http.Error(w, fmt.Sprintf("failed to get: %v", err), http.StatusInternalServerError)
return
}
h.writeResponse(w, obj, serializer)
}
func (h *DynamicCRHandler) handleCreate(ctx context.Context, w http.ResponseWriter, req *http.Request, creater rest.Creater, serializer runtime.Serializer) {
// Read the request body
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
return
}
defer req.Body.Close()
fmt.Printf("DynamicCRHandler: handleCreate received body: %s\n", string(body))
// Decode the body
decoder := h.codecs.UniversalDeserializer()
obj, gvk, err := decoder.Decode(body, nil, nil)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
return
}
fmt.Printf("DynamicCRHandler: Decoded object type: %T, GVK: %v\n", obj, gvk)
// Ensure we have an unstructured object
if obj == nil {
http.Error(w, "decoded object is nil", http.StatusBadRequest)
return
}
// Create the object
fmt.Printf("DynamicCRHandler: Calling creater.Create...\n")
created, err := creater.Create(ctx, obj, nil, nil)
if err != nil {
fmt.Printf("DynamicCRHandler: Create failed: %v\n", err)
http.Error(w, fmt.Sprintf("failed to create: %v", err), http.StatusInternalServerError)
return
}
fmt.Printf("DynamicCRHandler: Create succeeded, writing response\n")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
if err := serializer.Encode(created, w); err != nil {
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
}
}
func (h *DynamicCRHandler) handleUpdate(ctx context.Context, w http.ResponseWriter, req *http.Request, updater rest.Updater, name string, serializer runtime.Serializer) {
fmt.Printf("DynamicCRHandler: handleUpdate for resource: %s\n", name)
// Read the request body
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
return
}
defer req.Body.Close()
// Decode the body
decoder := h.codecs.UniversalDeserializer()
obj, gvk, err := decoder.Decode(body, nil, nil)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
return
}
fmt.Printf("DynamicCRHandler: Decoded object type: %T, GVK: %v\n", obj, gvk)
// Create UpdatedObjectInfo
objInfo := rest.DefaultUpdatedObjectInfo(obj)
// Update the object
updated, created, err := updater.Update(ctx, name, objInfo, nil, nil, false, nil)
if err != nil {
fmt.Printf("DynamicCRHandler: Update failed: %v\n", err)
if apierrors.IsNotFound(err) {
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
} else {
http.Error(w, fmt.Sprintf("failed to update: %v", err), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "application/json")
if created {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
fmt.Printf("DynamicCRHandler: Update succeeded\n")
if err := serializer.Encode(updated, w); err != nil {
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
}
}
func (h *DynamicCRHandler) handlePatch(ctx context.Context, w http.ResponseWriter, req *http.Request, patcher rest.Patcher, name string, serializer runtime.Serializer) {
// rest.Patcher is actually just Getter + Updater
// We need to get the existing object, apply the patch, then update
// Read the request body
patchBytes, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
return
}
defer req.Body.Close()
// Get the existing object to verify it exists
_, err = patcher.Get(ctx, name, nil)
if err != nil {
if apierrors.IsNotFound(err) {
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
} else {
http.Error(w, fmt.Sprintf("failed to get resource: %v", err), http.StatusInternalServerError)
}
return
}
// Determine patch type from Content-Type header
// Using mime here as content type might be like:
// "Content-Type: application/json; charset=utf-8"
contentType, _, err := mime.ParseMediaType(req.Header.Get("Content-Type"))
if err != nil {
http.Error(w, fmt.Sprintf("error parsing Content-Type: %s", contentType), http.StatusUnsupportedMediaType)
return
}
var patchType types.PatchType
switch contentType {
case string(types.JSONPatchType):
patchType = types.JSONPatchType
case string(types.MergePatchType):
patchType = types.MergePatchType
case string(types.StrategicMergePatchType):
patchType = types.StrategicMergePatchType
case "application/json":
// Default to merge patch for plain JSON
// We cannot fall back to Strategic as we miss
// Go structs for dynamic CRDs
patchType = types.MergePatchType
default:
http.Error(w, fmt.Sprintf("unsupported Content-Type: %s", contentType), http.StatusUnsupportedMediaType)
return
}
// Apply the patch via our custom storage (which has the Patch method)
if storage, ok := patcher.(*customResourceStorage); ok {
patched, err := storage.Patch(ctx, name, patchType, patchBytes, nil)
if err != nil {
if apierrors.IsNotFound(err) {
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
} else {
http.Error(w, fmt.Sprintf("failed to patch: %v", err), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := serializer.Encode(patched, w); err != nil {
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
}
} else {
http.Error(w, "patch not supported for this resource type", http.StatusNotImplemented)
}
}
func (h *DynamicCRHandler) handleDelete(ctx context.Context, w http.ResponseWriter, req *http.Request, deleter rest.GracefulDeleter, name string, serializer runtime.Serializer) {
// Parse delete options from query/body
deleteOptions := &metav1.DeleteOptions{}
// Delete the object
obj, deleted, err := deleter.Delete(ctx, name, nil, deleteOptions)
if err != nil {
if apierrors.IsNotFound(err) {
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
} else {
http.Error(w, fmt.Sprintf("failed to delete: %v", err), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "application/json")
if deleted {
w.WriteHeader(http.StatusOK)
} else {
// Return 202 Accepted if deletion is pending (e.g., finalizers)
w.WriteHeader(http.StatusAccepted)
}
if err := serializer.Encode(obj, w); err != nil {
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
}
}
func (h *DynamicCRHandler) writeResponse(w http.ResponseWriter, obj runtime.Object, serializer runtime.Serializer) {
w.Header().Set("Content-Type", "application/json")
if err := serializer.Encode(obj, w); err != nil {
http.Error(w, fmt.Sprintf("failed to encode response: %v", err), http.StatusInternalServerError)
}
}

View File

@@ -0,0 +1,452 @@
package apiextensions
import (
"context"
"fmt"
"net/http"
"sync"
authlib "github.com/grafana/authlib/types"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kube-openapi/pkg/spec3"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// DynamicRegistry manages the dynamic registration and unregistration of custom resources
type DynamicRegistry struct {
scheme *runtime.Scheme
optsGetter generic.RESTOptionsGetter
unifiedClient resource.ResourceClient
apiGroupInfo *genericapiserver.APIGroupInfo
accessClient authlib.AccessClient
server *genericapiserver.GenericAPIServer // API server to install new groups
discoveryManager *DiscoveryManager
mu sync.RWMutex
registrations map[string]*customResourceRegistration // key: group/version/resource
apiGroups map[string]*genericapiserver.APIGroupInfo // key: group name
// openAPISpecs caches OpenAPI specs per GroupVersion and CRD Name
openAPISpecs map[schema.GroupVersion]map[string]*spec3.OpenAPI
}
type customResourceRegistration struct {
crd *apiextensionsv1.CustomResourceDefinition
storage *customResourceStorage
}
// NewDynamicRegistry creates a new dynamic registry
func NewDynamicRegistry(
scheme *runtime.Scheme,
optsGetter generic.RESTOptionsGetter,
apiGroupInfo *genericapiserver.APIGroupInfo,
accessClient authlib.AccessClient,
) *DynamicRegistry {
return &DynamicRegistry{
scheme: scheme,
optsGetter: optsGetter,
apiGroupInfo: apiGroupInfo,
accessClient: accessClient,
discoveryManager: NewDiscoveryManager(),
registrations: make(map[string]*customResourceRegistration),
apiGroups: make(map[string]*genericapiserver.APIGroupInfo),
openAPISpecs: make(map[schema.GroupVersion]map[string]*spec3.OpenAPI),
}
}
// SetAPIServer sets the API server for dynamic group installation
func (r *DynamicRegistry) SetAPIServer(server *genericapiserver.GenericAPIServer) {
r.mu.Lock()
defer r.mu.Unlock()
r.server = server
// Register discovery handlers for any API groups that were registered before we had the server reference
for groupName := range r.apiGroups {
// Find the version for this group
for _, reg := range r.registrations {
if reg.crd.Spec.Group == groupName {
versionName := reg.crd.Spec.Versions[0].Name
r.registerDiscoveryHandlers(groupName, versionName)
break
}
}
}
// Wrap the main /apis handler to include our custom groups
r.registerAllWithAggregatedDiscovery()
// Note: OpenAPI registration is deferred to a PostStartHook because
// OpenAPIV3VersionedService is only available after PrepareRun()
}
// RegisterOpenAPIForExistingCRDs registers OpenAPI specs for all existing CRDs
// This is called from a PostStartHook after the server is fully prepared
func (r *DynamicRegistry) RegisterOpenAPIForExistingCRDs() error {
r.mu.Lock()
defer r.mu.Unlock()
// Register OpenAPI specs for all CRDs that were registered before the server was fully prepared
for _, reg := range r.registrations {
if err := r.updateOpenAPISpecLocked(reg.crd); err != nil {
fmt.Printf("Warning: failed to update OpenAPI spec for CRD %s: %v\n", reg.crd.Name, err)
}
}
return nil
}
// registerDiscoveryHandlers registers HTTP handlers for discovery endpoints
func (r *DynamicRegistry) registerDiscoveryHandlers(group, version string) {
if r.server == nil {
return
}
// Register /apis/<group> handler
groupPath := fmt.Sprintf("/apis/%s", group)
r.server.Handler.NonGoRestfulMux.HandleFunc(groupPath, func(w http.ResponseWriter, req *http.Request) {
r.discoveryManager.ServeAPIGroup(w, req, group)
})
// DEBUG
fmt.Printf("Registered discovery handler: %s\n", groupPath)
// Register /apis/<group>/<version> handler
gv := schema.GroupVersion{Group: group, Version: version}
versionPath := fmt.Sprintf("/apis/%s/%s", group, version)
r.server.Handler.NonGoRestfulMux.HandleFunc(versionPath, func(w http.ResponseWriter, req *http.Request) {
r.discoveryManager.ServeAPIResourceList(w, req, gv)
})
fmt.Printf("Registered discovery handler: %s\n", versionPath)
}
// registerAllWithAggregatedDiscovery registers all custom groups with the server's aggregated discovery manager
func (r *DynamicRegistry) registerAllWithAggregatedDiscovery() {
if r.server == nil || r.server.AggregatedDiscoveryGroupManager == nil {
return
}
// Register each custom group with the discovery manager
for _, apiGroup := range r.discoveryManager.apiGroups {
for _, gvDiscovery := range apiGroup.Versions {
r.registerWithAggregatedDiscovery(apiGroup.Name, gvDiscovery.Version)
}
}
}
// registerWithAggregatedDiscovery registers a specific group version with the aggregated discovery manager
func (r *DynamicRegistry) registerWithAggregatedDiscovery(groupName, versionName string) {
if r.server == nil || r.server.AggregatedDiscoveryGroupManager == nil {
return
}
// Get the resources for this version
gvKey := fmt.Sprintf("%s/%s", groupName, versionName)
resourceList := r.discoveryManager.resources[gvKey]
if resourceList != nil {
// Convert our metav1.APIResourceList to apidiscoveryv2.APIVersionDiscovery
apiResources := make([]apidiscoveryv2.APIResourceDiscovery, 0, len(resourceList.APIResources))
for _, res := range resourceList.APIResources {
apiRes := apidiscoveryv2.APIResourceDiscovery{
Resource: res.Name,
ResponseKind: &metav1.GroupVersionKind{
Group: groupName,
Version: versionName,
Kind: res.Kind,
},
Scope: getScopeType(res.Namespaced),
ShortNames: res.ShortNames,
Verbs: res.Verbs,
}
apiResources = append(apiResources, apiRes)
}
versionDiscovery := apidiscoveryv2.APIVersionDiscovery{
Version: versionName,
Resources: apiResources,
}
// Create a manager with CRD source
manager := r.server.AggregatedDiscoveryGroupManager.WithSource(aggregated.CRDSource)
// Add group version
manager.AddGroupVersion(
groupName,
versionDiscovery,
)
// Set priority to ensure it's discoverable
gv := metav1.GroupVersion{
Group: groupName,
Version: versionName,
}
manager.SetGroupVersionPriority(gv, 1000, 100)
fmt.Printf("✅ Registered %s with AggregatedDiscoveryGroupManager (Source: CRD)\n", gvKey)
}
}
// getScopeType converts boolean namespaced flag to apidiscoveryv2 scope type
func getScopeType(namespaced bool) apidiscoveryv2.ResourceScope {
if namespaced {
return apidiscoveryv2.ScopeNamespace
}
return apidiscoveryv2.ScopeCluster
}
// Start begins watching CRDs for dynamic registration
func (r *DynamicRegistry) Start(ctx context.Context, crdStore *genericregistry.Store) {
// Currently we don't watch for changes
// CRDs are loaded during UpdateAPIGroupInfo before the server starts
// We need to implement proper watching mechanism here.
// TODO(@konsalex): Watch for CRD changes and call RegisterCRD/UpdateCRD/UnregisterCRD
<-ctx.Done()
}
// RegisterCRD registers a custom resource dynamically based on the CRD spec
func (r *DynamicRegistry) RegisterCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
r.mu.Lock()
defer r.mu.Unlock()
// TODO(@konsalex): Support multiple versions
if len(crd.Spec.Versions) != 1 {
return fmt.Errorf("only single-version CRDs are supported")
}
version := crd.Spec.Versions[0]
// Create storage for the custom resource
crStorage, err := NewCustomResourceStorage(
crd,
version.Name,
r.scheme,
r.optsGetter,
r.accessClient,
r.unifiedClient,
)
if err != nil {
return fmt.Errorf("failed to create custom resource storage: %w", err)
}
// Get or create API group info for this custom resource's group
group := crd.Spec.Group
apiGroupInfo, ok := r.apiGroups[group]
isNewGroup := !ok
if !ok {
// Create a simple API group info without using NewDefaultAPIGroupInfo
// to avoid OpenAPI validation issues with unstructured types
gvInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: []schema.GroupVersion{{Group: group, Version: version.Name}},
VersionedResourcesStorageMap: make(map[string]map[string]rest.Storage),
Scheme: r.scheme,
NegotiatedSerializer: r.apiGroupInfo.NegotiatedSerializer,
ParameterCodec: r.apiGroupInfo.ParameterCodec,
}
r.apiGroups[group] = &gvInfo
apiGroupInfo = &gvInfo
}
// Get or create the storage map for this version
storageMap, ok := apiGroupInfo.VersionedResourcesStorageMap[version.Name]
if !ok {
storageMap = make(map[string]rest.Storage)
apiGroupInfo.VersionedResourcesStorageMap[version.Name] = storageMap
// Update prioritized versions if this is a new version
found := false
for _, gv := range apiGroupInfo.PrioritizedVersions {
if gv.Version == version.Name {
found = true
break
}
}
if !found {
apiGroupInfo.PrioritizedVersions = append(apiGroupInfo.PrioritizedVersions,
schema.GroupVersion{Group: group, Version: version.Name})
}
}
// Register the custom resource storage
resourcePath := crd.Spec.Names.Plural
storageMap[resourcePath] = crStorage
// Register status subresource if defined
// TODO(@konsalex): Implement status subresource for direct storage
// if version.Subresources != nil && version.Subresources.Status != nil {
// storageMap[resourcePath+"/status"] = crStorage
// }
// Store the registration
key := fmt.Sprintf("%s/%s/%s", crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
r.registrations[key] = &customResourceRegistration{
crd: crd.DeepCopy(),
storage: crStorage,
}
// Add to discovery manager
// (this will make kubectl work properly)
r.discoveryManager.AddCustomResource(crd)
// Register discovery HTTP handlers if we have the server
if r.server != nil && isNewGroup {
r.registerDiscoveryHandlers(group, version.Name)
// Also register with aggregated discovery manager
r.registerWithAggregatedDiscovery(group, version.Name)
}
// Update OpenAPI spec
if err := r.updateOpenAPISpecLocked(crd); err != nil {
fmt.Printf("Warning: failed to update OpenAPI spec for CRD %s: %v\n", crd.Name, err)
}
return nil
}
// UpdateCRD updates a custom resource registration
func (r *DynamicRegistry) UpdateCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
r.mu.Lock()
defer r.mu.Unlock()
// Hack for now, we just un-register it and the re-register it.
// Not sure if there is any drawback
if err := r.unregisterCRDLocked(crd); err != nil {
return err
}
return r.RegisterCRD(crd)
}
// UnregisterCRD removes a custom resource registration
func (r *DynamicRegistry) UnregisterCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
r.mu.Lock()
defer r.mu.Unlock()
return r.unregisterCRDLocked(crd)
}
func (r *DynamicRegistry) unregisterCRDLocked(crd *apiextensionsv1.CustomResourceDefinition) error {
if len(crd.Spec.Versions) != 1 {
return fmt.Errorf("only single-version CRDs are supported")
}
version := crd.Spec.Versions[0]
key := fmt.Sprintf("%s/%s/%s", crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
// Remove from registrations
delete(r.registrations, key)
// Remove from API group info storage map
if storageMap, ok := r.apiGroupInfo.VersionedResourcesStorageMap[version.Name]; ok {
resourcePath := crd.Spec.Names.Plural
delete(storageMap, resourcePath)
delete(storageMap, resourcePath+"/status")
}
// Remove from OpenAPI specs
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}
if specs, ok := r.openAPISpecs[gv]; ok {
delete(specs, crd.Name)
if len(specs) == 0 {
delete(r.openAPISpecs, gv)
// Remove from service
if r.server != nil && r.server.OpenAPIV3VersionedService != nil {
path := fmt.Sprintf("apis/%s/%s", gv.Group, gv.Version)
r.server.OpenAPIV3VersionedService.DeleteGroupVersion(path)
}
} else {
// Update with remaining specs
if err := r.updateGroupVersionOpenAPILocked(gv); err != nil {
return fmt.Errorf("failed to update OpenAPI spec after unregistering CRD: %w", err)
}
}
}
return nil
}
// GetRegistration returns the registration for a custom resource
func (r *DynamicRegistry) GetRegistration(group, version, resource string) (*customResourceRegistration, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
key := fmt.Sprintf("%s/%s/%s", group, version, resource)
reg, ok := r.registrations[key]
return reg, ok
}
// updateOpenAPISpecLocked builds and updates the OpenAPI spec for the given CRD
// mu must be held by caller
func (r *DynamicRegistry) updateOpenAPISpecLocked(crd *apiextensionsv1.CustomResourceDefinition) error {
if r.server == nil || r.server.OpenAPIV3VersionedService == nil {
return nil
}
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
// Build OpenAPI V3 spec
spec, err := builder.BuildOpenAPIV3(crd, v.Name, builder.Options{
V2: false,
IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
})
if err != nil {
return fmt.Errorf("failed to build OpenAPI V3 spec: %w", err)
}
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
if r.openAPISpecs[gv] == nil {
r.openAPISpecs[gv] = make(map[string]*spec3.OpenAPI)
}
r.openAPISpecs[gv][crd.Name] = spec
// Update the group version spec in the service
if err := r.updateGroupVersionOpenAPILocked(gv); err != nil {
return err
}
}
return nil
}
// updateGroupVersionOpenAPILocked merges all specs for a GV and updates the service
// mu must be held by caller
func (r *DynamicRegistry) updateGroupVersionOpenAPILocked(gv schema.GroupVersion) error {
if r.server == nil || r.server.OpenAPIV3VersionedService == nil {
return nil
}
specsMap := r.openAPISpecs[gv]
if len(specsMap) == 0 {
return nil
}
var specs []*spec3.OpenAPI
for _, spec := range specsMap {
specs = append(specs, spec)
}
mergedSpec, err := builder.MergeSpecsV3(specs...)
if err != nil {
return fmt.Errorf("failed to merge specs: %w", err)
}
path := fmt.Sprintf("apis/%s/%s", gv.Group, gv.Version)
r.server.OpenAPIV3VersionedService.UpdateGroupVersion(path, mergedSpec)
return nil
}

View File

@@ -0,0 +1,309 @@
package apiextensions
import (
"context"
"fmt"
"net/http"
"github.com/prometheus/client_golang/prometheus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsopenapi "k8s.io/apiextensions-apiserver/pkg/generated/openapi"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kube-openapi/pkg/common"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanaauthorizer "github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var _ builder.APIGroupBuilder = (*APIExtensionsBuilder)(nil)
// APIExtensionsBuilder implements builder.APIGroupBuilder for CustomResourceDefinitions
type APIExtensionsBuilder struct {
features featuremgmt.FeatureToggles
storage *genericregistry.Store
accessClient authlib.AccessClient
dynamicReg *DynamicRegistry
restOptGetter *apistore.RESTOptionsGetter
apiregistrar builder.APIRegistrar
unifiedClient resource.ResourceClient
preloadedCRDs []*apiextensionsv1.CustomResourceDefinition // CRDs loaded before init
dynamicHandler *DynamicCRHandler // Dynamic handler for custom resources
server *genericapiserver.GenericAPIServer // The running API server
}
// SetAPIServer sets the API server instance
// This allows the builder to register dynamic API groups (CRDs)
func (b *APIExtensionsBuilder) SetAPIServer(server *genericapiserver.GenericAPIServer) {
b.server = server
if b.dynamicReg != nil {
b.dynamicReg.SetAPIServer(server)
}
// Register a PostStartHook to register OpenAPI specs after the server is fully prepared
// This is necessary because OpenAPIV3VersionedService is only available after PrepareRun()
server.AddPostStartHookOrDie("apiextensions-openapi", func(context genericapiserver.PostStartHookContext) error {
if b.dynamicReg != nil {
return b.dynamicReg.RegisterOpenAPIForExistingCRDs()
}
return nil
})
}
// RegisterAPIService registers the apiextensions API group in single-tenant mode
func RegisterAPIService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
apiregistration builder.APIRegistrar,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
) (*APIExtensionsBuilder, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
return nil, nil
}
b := &APIExtensionsBuilder{
features: features,
accessClient: accessClient,
apiregistrar: apiregistration,
unifiedClient: unified,
}
// Register the CRD API group
apiregistration.RegisterAPI(b)
// NOTE: We can't load CRDs here because unified storage isn't fully initialized yet
// We'll use a PostStartHook instead to load CRDs after the server is running
// See the postStartHook field and RegisterPostStartHooks method below
return b, nil
}
// The default authorizer is fine because authorization happens in storage where we know the parent folder
func (b *APIExtensionsBuilder) GetAuthorizer() authorizer.Authorizer {
return grafanaauthorizer.NewServiceAuthorizer()
}
// loadAndRegisterCRDsWithDynamicHandler loads CRDs from storage and registers their handlers
// This is called during UpdateAPIGroupInfo when storage IS ready
func (b *APIExtensionsBuilder) loadAndRegisterCRDsWithDynamicHandler(
ctx context.Context,
crdStore *genericregistry.Store,
opts builder.APIGroupOptions,
) error {
// TODO(@konsalex): Have a conditional check here for MT
// For ST we can use the identity.WithServiceIdentityContext
// for MT we can use implicitly use a service token:
// https://github.com/grafana/kube-manifests/blob/9f5e409c72fef4f831b173480131121e9cb348a3/flux/dev-us-east-0/grafana-iam/Deployment-iam-grafana-app-main.yaml#L114C9-L114C59
// SO we can skip systemCtx creation
systemCtx := identity.WithServiceIdentityContext(context.WithoutCancel(ctx), 1)
// List all CRDs using the initialized storage
listObj, err := crdStore.List(systemCtx, &metainternalversion.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list CRDs: %w", err)
}
crdList, ok := listObj.(*apiextensionsv1.CustomResourceDefinitionList)
if !ok {
return fmt.Errorf("unexpected list type: %T", listObj)
}
if len(crdList.Items) == 0 {
fmt.Println("No existing CRDs found in storage")
return nil
}
fmt.Printf("Found %d CRDs in storage, registering with dynamic handler...\n", len(crdList.Items))
// For each CRD, create storage and register it with the dynamic handler
for i := range crdList.Items {
crd := &crdList.Items[i]
fmt.Printf(" - Processing CRD: %s (group: %s, version: %s, resource: %s)\n",
crd.Name, crd.Spec.Group, crd.Spec.Versions[0].Name, crd.Spec.Names.Plural)
// Support only single version for now
if len(crd.Spec.Versions) != 1 {
fmt.Printf(" Warning: only single-version CRDs supported, skipping %s\n", crd.Name)
continue
}
version := crd.Spec.Versions[0]
// Create storage for this custom resource
crStorage, err := NewCustomResourceStorage(
crd,
version.Name,
opts.Scheme,
opts.OptsGetter,
b.accessClient,
b.unifiedClient,
)
if err != nil {
fmt.Printf(" Warning: failed to create storage for CRD %s: %v\n", crd.Name, err)
continue
}
// Register with the dynamic handler (for HTTP routing)
b.dynamicHandler.RegisterCustomResource(crd, version.Name, crStorage)
// Register with the dynamic registry (for API discovery)
fmt.Printf(" DEBUG: Registering CRD with dynamic registry (b.dynamicReg=%v)...\n", b.dynamicReg != nil)
if err := b.dynamicReg.RegisterCRD(crd); err != nil {
fmt.Printf(" Warning: failed to register CRD with dynamic registry: %v\n", err)
} else {
fmt.Printf(" ✓ Registered CRD with dynamic registry\n")
}
fmt.Printf(" ✓ Registered with dynamic handler: %s/%s/%s\n",
crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
}
return nil
}
// NewAPIService creates an APIExtensionsBuilder for multi-tenant mode
// TODO(@konsalex): NOT YET IMPLEMENTED properly
func NewAPIService(
accessClient authlib.AccessClient,
unified resource.ResourceClient,
registerer prometheus.Registerer,
features featuremgmt.FeatureToggles,
) (*APIExtensionsBuilder, error) {
return &APIExtensionsBuilder{
features: features,
accessClient: accessClient,
}, nil
}
func (b *APIExtensionsBuilder) GetGroupVersion() schema.GroupVersion {
return apiextensionsv1.SchemeGroupVersion
}
func (b *APIExtensionsBuilder) InstallSchema(scheme *runtime.Scheme) error {
gv := b.GetGroupVersion()
// We don't register CRD types with AddKnownTypes here because:
// 1. The external k8s.io/apiextensions-apiserver types don't have openapi-gen annotations
// 2. This would cause the API server to try to generate OpenAPI specs for them
// 3. We register them at the storage level in UpdateAPIGroupInfo instead
// We only need to add the metav1 types to this group version
metav1.AddToGroupVersion(scheme, gv)
return scheme.SetVersionPriority(gv)
}
func (b *APIExtensionsBuilder) AllowedV0Alpha1Resources() []string {
return nil
}
// GetDynamicHandler returns the dynamic custom resource handler
// This can be used to install it as a fallback handler in the HTTP server
func (b *APIExtensionsBuilder) GetDynamicHandler() http.Handler {
if b.dynamicHandler == nil {
return nil
}
return b.dynamicHandler
}
func (b *APIExtensionsBuilder) UpdateAPIGroupInfo(
apiGroupInfo *genericapiserver.APIGroupInfo,
opts builder.APIGroupOptions,
) error {
// Register storage options for CRDs
opts.StorageOptsRegister(
schema.GroupResource{Group: apiextensionsv1.GroupName, Resource: "customresourcedefinitions"},
apistore.StorageOptions{},
)
// Register the CRD types directly with the scheme now (at storage registration time)
// This avoids OpenAPI generation issues during schema installation
gv := b.GetGroupVersion()
opts.Scheme.AddKnownTypes(gv,
&apiextensionsv1.CustomResourceDefinition{},
&apiextensionsv1.CustomResourceDefinitionList{},
)
// Create the main CRD storage
crdResourceInfo := utils.NewResourceInfo(
apiextensionsv1.GroupName,
"v1",
"customresourcedefinitions",
"customresourcedefinition",
"CustomResourceDefinition",
func() runtime.Object { return &apiextensionsv1.CustomResourceDefinition{} },
func() runtime.Object { return &apiextensionsv1.CustomResourceDefinitionList{} },
utils.TableColumns{},
)
crdResourceInfoWithScope := crdResourceInfo.WithClusterScope()
unified, err := grafanaregistry.NewRegistryStore(
opts.Scheme,
crdResourceInfoWithScope,
opts.OptsGetter,
)
if err != nil {
return fmt.Errorf("failed to create CRD storage: %w", err)
}
b.storage = unified
b.restOptGetter = opts.OptsGetter.(*apistore.RESTOptionsGetter)
// Initialize dynamic registry for custom resources
b.dynamicReg = NewDynamicRegistry(
opts.Scheme,
opts.OptsGetter,
apiGroupInfo,
b.accessClient,
)
if b.server != nil {
b.dynamicReg.SetAPIServer(b.server)
}
// Create the dynamic handler for custom resources
b.dynamicHandler = NewDynamicCRHandler(opts.Scheme)
// NOW storage is ready, load CRDs and register their handlers dynamically
if err := b.loadAndRegisterCRDsWithDynamicHandler(context.Background(), unified, opts); err != nil {
// TODO(@konsalex): use logger here
fmt.Printf("failed to load and register CRDs: %v\n", err)
// Don't fail - CRDs can be created later
}
// Start watching CRDs for changes (WIP)
go b.dynamicReg.Start(context.Background(), unified)
storage := map[string]rest.Storage{}
storage["customresourcedefinitions"] = &crdStorage{
Store: unified,
dynamicReg: b.dynamicReg,
}
storage["customresourcedefinitions/status"] = &crdStatusStorage{Store: unified}
apiGroupInfo.VersionedResourcesStorageMap[apiextensionsv1.SchemeGroupVersion.Version] = storage
return nil
}
func (b *APIExtensionsBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions {
return func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
return apiextensionsopenapi.GetOpenAPIDefinitions(ref)
}
}

View File

@@ -0,0 +1,69 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: widgets.customcrdtest.grafana.app
spec:
group: customcrdtest.grafana.app
names:
kind: Widget
listKind: WidgetList
plural: widgets
singular: widget
shortNames:
- wg
scope: Namespaced
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
type: object
properties:
size:
type: string
enum:
- small
- medium
- large
replicas:
type: integer
minimum: 1
maximum: 10
required:
- size
status:
type: object
properties:
ready:
type: boolean
message:
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: Size
type: string
description: The size of the widget
jsonPath: .spec.size
- name: Replicas
type: integer
description: Number of replicas
jsonPath: .spec.replicas
- name: Ready
type: boolean
description: Is the widget ready?
jsonPath: .status.ready
- name: Age
type: date
jsonPath: .metadata.creationTimestamp

View File

@@ -0,0 +1,8 @@
apiVersion: customcrdtest.grafana.app/v1
kind: Widget
metadata:
name: my-widget
namespace: default
spec:
size: medium
replicas: 3

View File

@@ -0,0 +1,111 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: outages.monitoring.grafana.app
spec:
group: monitoring.grafana.app
names:
kind: Outage
listKind: OutageList
plural: outages
singular: outage
shortNames:
- out
scope: Cluster # ← CLUSTER-SCOPED (not namespaced)
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
type: object
properties:
region:
type: string
description: Geographic region affected by the outage
enum:
- us-east-1
- us-west-2
- eu-west-1
- eu-central-1
- ap-southeast-1
- ap-northeast-1
severity:
type: string
description: Severity level of the outage
enum:
- critical
- major
- minor
default: major
affectedServices:
type: array
description: List of services affected
items:
type: string
startTime:
type: string
format: date-time
description: When the outage started
description:
type: string
description: Description of the outage
required:
- region
- severity
- startTime
status:
type: object
properties:
resolved:
type: boolean
description: Whether the outage has been resolved
resolvedAt:
type: string
format: date-time
description: When the outage was resolved
affectedCustomers:
type: integer
description: Number of customers affected
updates:
type: array
description: Status updates
items:
type: object
properties:
timestamp:
type: string
format: date-time
message:
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: Region
type: string
description: Geographic region
jsonPath: .spec.region
- name: Severity
type: string
description: Severity level
jsonPath: .spec.severity
- name: Resolved
type: boolean
description: Resolution status
jsonPath: .status.resolved
- name: Start Time
type: date
description: When the outage started
jsonPath: .spec.startTime
- name: Age
type: date
jsonPath: .metadata.creationTimestamp

View File

@@ -0,0 +1,22 @@
apiVersion: monitoring.grafana.app/v1
kind: Outage
metadata:
name: outage-2025-11-20-us-east
# NO namespace field - this is cluster-scoped!
spec:
region: us-east-1
severity: critical
affectedServices:
- grafana-cloud-metrics
- grafana-cloud-logs
- grafana-cloud-traces
startTime: "2025-11-20T10:00:00Z"
description: "Database connectivity issues affecting multiple services in US East region"
status:
resolved: false
affectedCustomers: 1247
updates:
- timestamp: "2025-11-20T10:15:00Z"
message: "Incident detected, investigating database connectivity"
- timestamp: "2025-11-20T10:30:00Z"
message: "Root cause identified, applying fix to primary database cluster"

View File

@@ -0,0 +1,176 @@
package apiextensions
import (
"context"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
)
var _ rest.StandardStorage = (*crdStorage)(nil)
var _ rest.Scoper = (*crdStorage)(nil)
// crdStorage wraps the generic registry store and adds CRD-specific validation and dynamic registration
type crdStorage struct {
*genericregistry.Store
dynamicReg *DynamicRegistry
}
// NamespaceScoped returns false since CRDs are cluster-scoped resources
func (s *crdStorage) NamespaceScoped() bool {
return false
}
// Create validates and creates a CRD, then triggers dynamic registration of the custom resource
func (s *crdStorage) Create(
ctx context.Context,
obj runtime.Object,
createValidation rest.ValidateObjectFunc,
options *metav1.CreateOptions,
) (runtime.Object, error) {
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
return nil, apierrors.NewBadRequest("object is not a CustomResourceDefinition")
}
if err := validateCRDForPhase1(crd); err != nil {
return nil, err
}
// Create the CRD in storage
result, err := s.Store.Create(ctx, obj, createValidation, options)
if err != nil {
return nil, err
}
// Register the custom resource dynamically
createdCRD, ok := result.(*apiextensionsv1.CustomResourceDefinition)
if ok {
if err := s.dynamicReg.RegisterCRD(createdCRD); err != nil {
// Log error but don't fail the creation
// TODO: Add proper logging
fmt.Printf("Warning: failed to register CRD dynamically: %v\n", err)
}
}
return result, nil
}
// Update validates and updates a CRD, then updates the dynamic registration
func (s *crdStorage) Update(
ctx context.Context,
name string,
objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc,
forceAllowCreate bool,
options *metav1.UpdateOptions,
) (runtime.Object, bool, error) {
result, created, err := s.Store.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return nil, false, err
}
// Update dynamic registration
updatedCRD, ok := result.(*apiextensionsv1.CustomResourceDefinition)
if ok {
if err := s.dynamicReg.UpdateCRD(updatedCRD); err != nil {
fmt.Printf("Warning: failed to update CRD registration: %v\n", err)
}
}
return result, created, nil
}
// Delete deletes a CRD and unregisters the custom resource
func (s *crdStorage) Delete(
ctx context.Context,
name string,
deleteValidation rest.ValidateObjectFunc,
options *metav1.DeleteOptions,
) (runtime.Object, bool, error) {
// Get the CRD before deletion
obj, err := s.Store.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
return nil, false, apierrors.NewInternalError(fmt.Errorf("object is not a CRD"))
}
// Delete the CRD
result, immediate, err := s.Store.Delete(ctx, name, deleteValidation, options)
if err != nil {
return nil, false, err
}
// Unregister the custom resource
if err := s.dynamicReg.UnregisterCRD(crd); err != nil {
fmt.Printf("Warning: failed to unregister CRD: %v\n", err)
}
return result, immediate, nil
}
func validateCRDForPhase1(crd *apiextensionsv1.CustomResourceDefinition) error {
// TODO(@konsalex): only support single-version CRDs for now
if len(crd.Spec.Versions) != 1 {
return apierrors.NewBadRequest(
fmt.Sprintf("we only support single-version CRDs, got %d versions", len(crd.Spec.Versions)),
)
}
// TODO(@konsalex): no webhook conversion for now we will need to support this
if crd.Spec.Conversion != nil && crd.Spec.Conversion.Strategy == apiextensionsv1.WebhookConverter {
return apierrors.NewBadRequest("no support webhook conversion")
}
// Ensure the single version is marked as both served and storage
if len(crd.Spec.Versions) > 0 {
version := crd.Spec.Versions[0]
if !version.Served {
return apierrors.NewBadRequest("the single version must be marked as served")
}
if !version.Storage {
return apierrors.NewBadRequest("the single version must be marked as storage")
}
}
return nil
}
// crdStatusStorage handles the status subresource for CRDs
type crdStatusStorage struct {
*genericregistry.Store
}
func (s *crdStatusStorage) New() runtime.Object {
return &apiextensionsv1.CustomResourceDefinition{}
}
func (s *crdStatusStorage) Get(
ctx context.Context,
name string,
options *metav1.GetOptions,
) (runtime.Object, error) {
return s.Store.Get(ctx, name, options)
}
func (s *crdStatusStorage) Update(
ctx context.Context,
name string,
objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc,
forceAllowCreate bool,
options *metav1.UpdateOptions,
) (runtime.Object, bool, error) {
return s.Store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
}

View File

@@ -0,0 +1,687 @@
package apiextensions
import (
"context"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
authlib "github.com/grafana/authlib/types"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
var _ rest.StandardStorage = (*customResourceStorage)(nil)
var _ rest.Patcher = (*customResourceStorage)(nil)
var _ rest.Scoper = (*customResourceStorage)(nil)
var (
internalScheme = runtime.NewScheme()
)
func init() {
_ = apiextensionsv1.AddToScheme(internalScheme)
_ = apiextensions.AddToScheme(internalScheme)
}
// customResourceStorage implements REST storage for custom resource instances
// It bypasses the generic registry store and calls unified storage directly
// to properly handle unstructured objects
type customResourceStorage struct {
storage storage.Interface // Direct unified storage interface
crd *apiextensionsv1.CustomResourceDefinition
version string
gvk schema.GroupVersionKind
gvr schema.GroupVersionResource
accessClient authlib.AccessClient
keyFunc func(ctx context.Context, name string) (string, error)
keyRootFunc func(ctx context.Context) string
}
// NewCustomResourceStorage creates storage for a custom resource based on its CRD
func NewCustomResourceStorage(
crd *apiextensionsv1.CustomResourceDefinition,
version string,
scheme *runtime.Scheme,
optsGetter generic.RESTOptionsGetter,
accessClient authlib.AccessClient,
unifiedClient resource.ResourceClient,
) (*customResourceStorage, error) {
gvk := schema.GroupVersionKind{
Group: crd.Spec.Group,
Version: version,
Kind: crd.Spec.Names.Kind,
}
gvr := schema.GroupVersionResource{
Group: crd.Spec.Group,
Version: version,
Resource: crd.Spec.Names.Plural,
}
// Register the custom resource type as unstructured
scheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{})
listGVK := gvk
listGVK.Kind = crd.Spec.Names.ListKind
scheme.AddKnownTypeWithName(listGVK, &unstructured.UnstructuredList{})
// Create resource info for this custom resource
resourceInfo := utils.NewResourceInfo(
crd.Spec.Group,
version,
crd.Spec.Names.Plural,
crd.Spec.Names.Singular,
crd.Spec.Names.Kind,
func() runtime.Object {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
return u
},
func() runtime.Object {
ul := &unstructured.UnstructuredList{}
ul.SetGroupVersionKind(schema.GroupVersionKind{
Group: crd.Spec.Group,
Version: version,
Kind: crd.Spec.Names.ListKind,
})
return ul
},
utils.TableColumns{},
)
// Make it cluster-scoped if needed
if crd.Spec.Scope == apiextensionsv1.ClusterScoped {
resourceInfo = resourceInfo.WithClusterScope()
}
// Register storage options
if restOptGetter, ok := optsGetter.(*apistore.RESTOptionsGetter); ok {
restOptGetter.RegisterOptions(
gvr.GroupResource(),
apistore.StorageOptions{},
)
}
// We need this to set the codec below
gr := gvr.GroupResource()
opts, err := optsGetter.GetRESTOptions(gr, &unstructured.Unstructured{})
if err != nil {
return nil, fmt.Errorf("failed to get REST options: %w", err)
}
// This codec can handle any dynamic type without compile-time registration
// Without explicitly defined we get this error:
// https://github.com/kubernetes/apimachinery/blob/5a348c53eef0072c40ddf00a45ace423c2790f2a/pkg/runtime/error.go#L52
opts.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
// Get the ConfigForResource
config := opts.StorageConfig.ForResource(gr)
// Create key functions for storage
keyFunc := func(obj runtime.Object) (string, error) {
accessor, err := utils.MetaAccessor(obj)
if err != nil {
return "", err
}
name := accessor.GetName()
ns := accessor.GetNamespace()
key := &grafanaregistry.Key{
Group: gr.Group,
Resource: gr.Resource,
Namespace: ns,
Name: name,
}
return key.String(), nil
}
keyParser := func(key string) (*resourcepb.ResourceKey, error) {
k, err := grafanaregistry.ParseKey(key)
if err != nil {
return nil, err
}
return &resourcepb.ResourceKey{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Name: k.Name,
}, nil
}
// Create the actual storage using apistore.NewStorage
underlyingStorage, _, err := apistore.NewStorage(
config,
unifiedClient,
keyFunc,
keyParser,
func() runtime.Object { return &unstructured.Unstructured{} },
func() runtime.Object { return &unstructured.UnstructuredList{} },
grafanaregistry.GetAttrs,
nil, // trigger
nil, // indexers
nil, // configProvider
apistore.StorageOptions{},
)
if err != nil {
return nil, fmt.Errorf("failed to create storage: %w", err)
}
return &customResourceStorage{
storage: underlyingStorage,
crd: crd,
version: version,
gvk: gvk,
gvr: gvr,
accessClient: accessClient,
keyFunc: grafanaregistry.NamespaceKeyFunc(gr),
keyRootFunc: grafanaregistry.KeyRootFunc(gr),
}, nil
}
// NamespaceScoped returns whether this custom resource is namespaced
func (s *customResourceStorage) NamespaceScoped() bool {
return s.crd.Spec.Scope == apiextensionsv1.NamespaceScoped
}
// New returns a new instance of the custom resource
func (s *customResourceStorage) New() runtime.Object {
return &unstructured.Unstructured{}
}
// NewList returns a new list instance
func (s *customResourceStorage) NewList() runtime.Object {
return &unstructured.UnstructuredList{}
}
// getValidator returns a validator for the custom resource
func (s *customResourceStorage) getValidator() (validation.SchemaValidator, error) {
// Find the version we are serving
var version *apiextensionsv1.CustomResourceDefinitionVersion
for i := range s.crd.Spec.Versions {
v := &s.crd.Spec.Versions[i]
if v.Name == s.version {
version = v
break
}
}
// If no schema is defined, we can't validate
if version == nil || version.Schema == nil || version.Schema.OpenAPIV3Schema == nil {
return nil, nil
}
// Convert v1 schema to internal schema
internalSchema := &apiextensions.JSONSchemaProps{}
if err := internalScheme.Convert(version.Schema.OpenAPIV3Schema, internalSchema, nil); err != nil {
return nil, fmt.Errorf("failed to convert schema to internal version: %w", err)
}
// Create validator
val, _, err := validation.NewSchemaValidator(internalSchema)
return val, err
}
// Create validates and creates a custom resource instance
func (s *customResourceStorage) Create(
ctx context.Context,
obj runtime.Object,
createValidation rest.ValidateObjectFunc,
options *metav1.CreateOptions,
) (runtime.Object, error) {
// Ensure the object is unstructured as it is a "custom" CRD
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, apierrors.NewBadRequest("object must be unstructured")
}
// Validate GVK matches the CRD
// Could it arrive here without a match from storage match failure?
objGVK := u.GroupVersionKind()
if objGVK.Group != s.gvk.Group || objGVK.Version != s.gvk.Version || objGVK.Kind != s.gvk.Kind {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("object GVK %s does not match CRD GVK %s", objGVK.String(), s.gvk.String()),
)
}
// Set defaults if not present
if u.GetNamespace() == "" && s.NamespaceScoped() {
u.SetNamespace("default")
}
if u.GetName() == "" {
u.SetName(uuid.New().String())
}
// Ensure GVK is set
u.SetGroupVersionKind(s.gvk)
// Run validation if provided
if createValidation != nil {
if err := createValidation(ctx, obj); err != nil {
return nil, err
}
}
// Validate against OpenAPI schema
validator, err := s.getValidator()
if err != nil {
return nil, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
}
if validator != nil {
if errs := validation.ValidateCustomResource(field.NewPath(""), u.UnstructuredContent(), validator); len(errs) > 0 {
return nil, apierrors.NewInvalid(s.gvk.GroupKind(), u.GetName(), errs)
}
}
// Generate storage key
key, err := s.keyFunc(ctx, u.GetName())
if err != nil {
return nil, fmt.Errorf("failed to generate key: %w", err)
}
if options != nil && len(options.DryRun) > 0 {
return u, nil
}
// Create the object in storage
// TODO(@konsalex): Figure out if this ttl should be (!=0)
out := &unstructured.Unstructured{}
if err := s.storage.Create(ctx, key, obj, out, 0); err != nil {
return nil, err
}
return out, nil
}
// Get retrieves a custom resource instance by name
func (s *customResourceStorage) Get(
ctx context.Context,
name string,
options *metav1.GetOptions,
) (runtime.Object, error) {
key, err := s.keyFunc(ctx, name)
if err != nil {
return nil, err
}
out := &unstructured.Unstructured{}
getOpts := storage.GetOptions{
IgnoreNotFound: false,
ResourceVersion: "",
}
if options != nil {
getOpts.ResourceVersion = options.ResourceVersion
}
if err := s.storage.Get(ctx, key, getOpts, out); err != nil {
return nil, err
}
return out, nil
}
// List retrieves a list of custom resource instances
func (s *customResourceStorage) List(
ctx context.Context,
options *metainternalversion.ListOptions,
) (runtime.Object, error) {
// Create an empty list to populate
listObj := &unstructured.UnstructuredList{}
listObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: s.gvk.Group,
Version: s.gvk.Version,
Kind: s.crd.Spec.Names.ListKind,
})
// Handle nil options
if options == nil {
options = &metainternalversion.ListOptions{}
}
// Convert metainternalversion.ListOptions to storage.ListOptions
listOpts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: storage.Everything,
}
if options.LabelSelector != nil {
listOpts.Predicate.Label = options.LabelSelector
}
if options.FieldSelector != nil {
listOpts.Predicate.Field = options.FieldSelector
}
// Get the key prefix for listing
keyPrefix := s.keyRootFunc(ctx)
// Call the underlying storage List/GetList
if err := s.storage.GetList(ctx, keyPrefix, listOpts, listObj); err != nil {
return nil, err
}
return listObj, nil
}
// Delete removes a custom resource instance
func (s *customResourceStorage) Delete(
ctx context.Context,
name string,
deleteValidation rest.ValidateObjectFunc,
options *metav1.DeleteOptions,
) (runtime.Object, bool, error) {
key, err := s.keyFunc(ctx, name)
if err != nil {
return nil, false, err
}
out := &unstructured.Unstructured{}
preconditions := &storage.Preconditions{}
if options != nil && options.Preconditions != nil {
if options.Preconditions.UID != nil {
preconditions.UID = options.Preconditions.UID
}
if options.Preconditions.ResourceVersion != nil {
preconditions.ResourceVersion = options.Preconditions.ResourceVersion
}
}
validateFunc := func(ctx context.Context, obj runtime.Object) error {
if deleteValidation != nil {
return deleteValidation(ctx, obj)
}
return nil
}
if options != nil && len(options.DryRun) > 0 {
// We need to fetch it to make sure it exists and to return it
if err := s.storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return nil, false, err
}
if err := validateFunc(ctx, out); err != nil {
return nil, false, err
}
return out, true, nil
}
if err := s.storage.Delete(ctx, key, out, preconditions, validateFunc, out, storage.DeleteOptions{}); err != nil {
return nil, false, err
}
return out, true, nil
}
// Update updates a custom resource instance
func (s *customResourceStorage) Update(
ctx context.Context,
name string,
objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc,
forceAllowCreate bool,
options *metav1.UpdateOptions,
) (runtime.Object, bool, error) {
key, err := s.keyFunc(ctx, name)
if err != nil {
return nil, false, err
}
// Get the existing object
existingObj := &unstructured.Unstructured{}
existingObj.SetGroupVersionKind(s.gvk)
err = s.storage.Get(ctx, key, storage.GetOptions{}, existingObj)
if err != nil {
if storage.IsNotFound(err) {
if !forceAllowCreate {
return nil, false, apierrors.NewNotFound(s.gvr.GroupResource(), name)
}
// If forceAllowCreate is true, treat as a create
// and not as an update
newObj, err := objInfo.UpdatedObject(ctx, nil)
if err != nil {
return nil, false, err
}
if createValidation != nil {
if err := createValidation(ctx, newObj); err != nil {
return nil, false, err
}
}
// Call Create internally
created, err := s.Create(ctx, newObj, nil, nil)
if err != nil {
return nil, false, err
}
return created, true, nil
}
return nil, false, err
}
// Get the updated object
updatedObj, err := objInfo.UpdatedObject(ctx, existingObj)
if err != nil {
return nil, false, err
}
// Validate the update
if updateValidation != nil {
if err := updateValidation(ctx, updatedObj, existingObj); err != nil {
return nil, false, err
}
}
// Ensure it's an unstructured object
updatedUnstructured, ok := updatedObj.(*unstructured.Unstructured)
if !ok {
return nil, false, fmt.Errorf("updated object is not unstructured")
}
// Set the GVK
updatedUnstructured.SetGroupVersionKind(s.gvk)
// Ensure namespace and name are set correctly
if s.NamespaceScoped() {
ns := updatedUnstructured.GetNamespace()
if ns == "" {
updatedUnstructured.SetNamespace("default")
}
}
updatedUnstructured.SetName(name)
// Validate against OpenAPI schema
validator, err := s.getValidator()
if err != nil {
return nil, false, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
}
if validator != nil {
// We need the old object as interface{}
if errs := validation.ValidateCustomResourceUpdate(field.NewPath(""), updatedUnstructured.UnstructuredContent(), existingObj.UnstructuredContent(), validator); len(errs) > 0 {
return nil, false, apierrors.NewInvalid(s.gvk.GroupKind(), name, errs)
}
}
// Use GuaranteedUpdate for optimistic concurrency control
out := &unstructured.Unstructured{}
out.SetGroupVersionKind(s.gvk)
updateFunc := func(input runtime.Object, respMeta storage.ResponseMeta) (runtime.Object, *uint64, error) {
// Return the updated object
return updatedUnstructured, nil, nil
}
if options != nil && len(options.DryRun) > 0 {
return updatedUnstructured, false, nil
}
preconditions := &storage.Preconditions{}
// We explicitly ignore the not-found, as we checked before proceeding
err = s.storage.GuaranteedUpdate(ctx, key, out, true, preconditions, updateFunc, out)
if err != nil {
return nil, false, err
}
return out, false, nil
}
// Patch patches a custom resource instance
func (s *customResourceStorage) Patch(
ctx context.Context,
name string,
patchType types.PatchType,
patchBytes []byte,
options *metav1.PatchOptions,
subresources ...string,
) (runtime.Object, error) {
key, err := s.keyFunc(ctx, name)
if err != nil {
return nil, err
}
existingObj := &unstructured.Unstructured{}
existingObj.SetGroupVersionKind(s.gvk)
err = s.storage.Get(ctx, key, storage.GetOptions{}, existingObj)
if err != nil {
if storage.IsNotFound(err) {
return nil, apierrors.NewNotFound(s.gvr.GroupResource(), name)
}
return nil, err
}
existingJSON, err := json.Marshal(existingObj.Object)
if err != nil {
return nil, fmt.Errorf("failed to marshal existing object: %w", err)
}
var patchedJSON []byte
switch patchType {
case types.JSONPatchType:
patch, err := jsonpatch.DecodePatch(patchBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode JSON patch: %w", err)
}
patchedJSON, err = patch.Apply(existingJSON)
if err != nil {
return nil, fmt.Errorf("failed to apply JSON patch: %w", err)
}
case types.MergePatchType:
patchedJSON, err = jsonpatch.MergePatch(existingJSON, patchBytes)
if err != nil {
return nil, fmt.Errorf("failed to apply merge patch: %w", err)
}
case types.StrategicMergePatchType:
// Kubernetes Strategic Merge Patch
// For unstructured objects, strategic merge patch behaves like merge patch
// because we don't have a Go struct to define merge strategies
// TODO(@konsalex): Clarify is we need to even support this by falling-back to merge patch, or just return an error to inform clients
// patchedJSON, err = strategicpatch.StrategicMergePatch(existingJSON, patchBytes, &unstructured.Unstructured{})
// Fallback to merge patch if strategic merge fails
patchedJSON, err = jsonpatch.MergePatch(existingJSON, patchBytes)
if err != nil {
return nil, fmt.Errorf("failed to apply patch: %w", err)
}
default:
return nil, fmt.Errorf("unsupported patch type: %s", patchType)
}
// Unmarshal the patched JSON into an unstructured object
patchedObj := &unstructured.Unstructured{}
if err := json.Unmarshal(patchedJSON, &patchedObj.Object); err != nil {
return nil, fmt.Errorf("failed to unmarshal patched object: %w", err)
}
// Set the GVK
patchedObj.SetGroupVersionKind(s.gvk)
// Ensure namespace and name are set correctly
if s.NamespaceScoped() {
ns := patchedObj.GetNamespace()
if ns == "" {
patchedObj.SetNamespace(existingObj.GetNamespace())
}
}
patchedObj.SetName(name)
// Validate against OpenAPI schema
validator, err := s.getValidator()
if err != nil {
return nil, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
}
if validator != nil {
// We need the old object as interface{}
if errs := validation.ValidateCustomResourceUpdate(field.NewPath(""), patchedObj.UnstructuredContent(), existingObj.UnstructuredContent(), validator); len(errs) > 0 {
return nil, apierrors.NewInvalid(s.gvk.GroupKind(), name, errs)
}
}
// Use GuaranteedUpdate to save the patched object
out := &unstructured.Unstructured{}
out.SetGroupVersionKind(s.gvk)
updateFunc := func(input runtime.Object, respMeta storage.ResponseMeta) (runtime.Object, *uint64, error) {
return patchedObj, nil, nil
}
preconditions := &storage.Preconditions{}
if options != nil && options.DryRun != nil && len(options.DryRun) > 0 {
fmt.Printf(" - Dry run mode\n")
}
err = s.storage.GuaranteedUpdate(ctx, key, out, true, preconditions, updateFunc, out)
if err != nil {
return nil, err
}
return out, nil
}
// DeleteCollection deletes a collection of custom resources
func (s *customResourceStorage) DeleteCollection(
ctx context.Context,
deleteValidation rest.ValidateObjectFunc,
options *metav1.DeleteOptions,
listOptions *metainternalversion.ListOptions,
) (runtime.Object, error) {
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "deletecollection")
}
// Watch returns a watch interface for custom resources
func (s *customResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "watch")
}
// ConvertToTable converts to a table for kubectl
func (s *customResourceStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "table")
}
// Destroy cleans up resources
func (s *customResourceStorage) Destroy() {
// Nothing to clean up for direct storage
}

View File

@@ -1,6 +1,7 @@
package apiregistry
import (
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
dashboardinternal "github.com/grafana/grafana/pkg/registry/apis/dashboard"
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
@@ -19,6 +20,7 @@ type Service struct{}
// ProvideRegistryServiceSink is an entry point for each service that will force initialization
// and give each builder the chance to register itself with the main server
func ProvideRegistryServiceSink(
_ *apiextensions.APIExtensionsBuilder,
_ *dashboardinternal.DashboardsAPIBuilder,
_ *dashboardsnapshot.SnapshotsAPIBuilder,
_ *datasource.DataSourceAPIBuilder,

View File

@@ -3,6 +3,7 @@ package apiregistry
import (
"github.com/google/wire"
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
dashboardinternal "github.com/grafana/grafana/pkg/registry/apis/dashboard"
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
@@ -54,6 +55,7 @@ var WireSet = wire.NewSet(
provisioningExtras,
// Each must be added here *and* in the ServiceSink above
apiextensions.RegisterAPIService,
dashboardinternal.RegisterAPIService,
dashboardsnapshot.RegisterAPIService,
datasource.RegisterAPIService,

13
pkg/server/wire_gen.go generated
View File

@@ -48,6 +48,7 @@ import (
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
"github.com/grafana/grafana/pkg/plugins/repo"
"github.com/grafana/grafana/pkg/registry/apis"
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
"github.com/grafana/grafana/pkg/registry/apis/dashboard"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
@@ -848,6 +849,10 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
identitySynchronizer := authnimpl.ProvideIdentitySynchronizer(authnimplService)
ldapImpl := service12.ProvideService(cfg, featureToggles, ssosettingsimplService)
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
apiExtensionsBuilder, err := apiextensions.RegisterAPIService(cfg, featureToggles, apiserverService, accessClient, registerer, resourceClient)
if err != nil {
return nil, err
}
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService, libraryElementService, publicDashboardServiceImpl)
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer, sourcesService)
@@ -900,7 +905,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
if err != nil {
return nil, err
}
apiregistryService := apiregistry.ProvideRegistryServiceSink(dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
apiregistryService := apiregistry.ProvideRegistryServiceSink(apiExtensionsBuilder, dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
teamPermissionsService, err := ossaccesscontrol.ProvideTeamPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, acimplService, teamService, userService, actionSetService)
if err != nil {
return nil, err
@@ -1489,6 +1494,10 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
identitySynchronizer := authnimpl.ProvideIdentitySynchronizer(authnimplService)
ldapImpl := service12.ProvideService(cfg, featureToggles, ssosettingsimplService)
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
apiExtensionsBuilder, err := apiextensions.RegisterAPIService(cfg, featureToggles, apiserverService, accessClient, registerer, resourceClient)
if err != nil {
return nil, err
}
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService, libraryElementService, publicDashboardServiceImpl)
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer, sourcesService)
@@ -1541,7 +1550,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
if err != nil {
return nil, err
}
apiregistryService := apiregistry.ProvideRegistryServiceSink(dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
apiregistryService := apiregistry.ProvideRegistryServiceSink(apiExtensionsBuilder, dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
teamPermissionsService, err := ossaccesscontrol.ProvideTeamPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, acimplService, teamService, userService, actionSetService)
if err != nil {
return nil, err

View File

@@ -0,0 +1,73 @@
package apiserver
import (
"net/http"
"strings"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
)
// createDynamicHandlerWrapper wraps the not-found handler with dynamic custom resource handlers
func (s *service) createDynamicHandlerWrapper(builders []builder.APIGroupBuilder, notFoundHandler http.Handler) http.Handler {
// Look for the APIExtensionsBuilder - we'll fetch the handler lazily on each request
// because the handler is created during UpdateAPIGroupInfo which happens AFTER this wrapper is installed
type dynamicHandlerProvider interface {
GetDynamicHandler() http.Handler
}
var dhProvider dynamicHandlerProvider
for _, b := range builders {
if provider, ok := b.(dynamicHandlerProvider); ok {
dhProvider = provider
break
}
}
if dhProvider == nil {
return notFoundHandler
}
// Return a wrapper that lazily fetches and tries the dynamic handler on each request
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if this is an /apis/ request that might be for a custom resource
if strings.HasPrefix(r.URL.Path, "/apis/") {
// Fetch the dynamic handler (it will be nil until UpdateAPIGroupInfo creates it)
dynamicHandler := dhProvider.GetDynamicHandler()
if dynamicHandler != nil {
// Create a response recorder to capture what the dynamic handler does
recorder := &responseRecorder{
ResponseWriter: w,
statusCode: 0,
}
dynamicHandler.ServeHTTP(recorder, r)
// If the dynamic handler handled it (didn't return 404), we're done
if recorder.statusCode != 0 && recorder.statusCode != http.StatusNotFound {
return
}
}
}
// Otherwise, fall back to the not-found handler
notFoundHandler.ServeHTTP(w, r)
})
}
// responseRecorder captures the status code from a handler
type responseRecorder struct {
http.ResponseWriter
statusCode int
}
func (r *responseRecorder) WriteHeader(statusCode int) {
r.statusCode = statusCode
r.ResponseWriter.WriteHeader(statusCode)
}
func (r *responseRecorder) Write(b []byte) (int, error) {
if r.statusCode == 0 {
r.statusCode = http.StatusOK
}
return r.ResponseWriter.Write(b)
}

View File

@@ -374,16 +374,36 @@ func (s *service) start(ctx context.Context) error {
notFoundHandler := notfoundhandler.New(s.codecs, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
var finalHandler http.Handler = notFoundHandler
//nolint:staticcheck // not yet migrated to OpenFeature
if s.features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
// Wrap the not-found handler with dynamic custom resource handler
finalHandler = s.createDynamicHandlerWrapper(builders, notFoundHandler)
}
if err := appinstaller.RegisterPostStartHooks(s.appInstallers, serverConfig); err != nil {
return fmt.Errorf("failed to register post start hooks for app installers: %w", err)
}
// Create the server
server, err := serverConfig.Complete().New("grafana-apiserver", genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
server, err := serverConfig.Complete().New("grafana-apiserver", genericapiserver.NewEmptyDelegateWithCustomHandler(finalHandler))
if err != nil {
return err
}
//nolint:staticcheck // not yet migrated to OpenFeature
if s.features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
// Inject the server instance into any builders that need it (e.g., APIExtensionsBuilder for dynamic CRD registration)
type apiServerSetter interface {
SetAPIServer(server *genericapiserver.GenericAPIServer)
}
for _, b := range builders {
if setter, ok := b.(apiServerSetter); ok {
setter.SetAPIServer(server)
}
}
}
// Install the API group+version for existing builders
err = builder.InstallAPIs(s.scheme, s.codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions,
s.metrics,

View File

@@ -830,6 +830,13 @@ var (
Owner: grafanaAppPlatformSquad,
RequiresRestart: true,
},
{
Name: "apiExtensions",
Description: "Enable Kubernetes CustomResourceDefinition (CRD) support with dynamic API registration",
Stage: FeatureStageExperimental,
Owner: grafanaAppPlatformSquad,
RequiresRestart: true,
},
{
Name: "groupByVariable",
Description: "Enable groupBy variable support in scenes dashboards",

View File

@@ -447,6 +447,10 @@ const (
// Enable CAP token based authentication in grafana&#39;s embedded kube-aggregator
FlagKubernetesAggregatorCapTokenAuth = "kubernetesAggregatorCapTokenAuth"
// FlagApiExtensions
// Enable Kubernetes CustomResourceDefinition (CRD) support with dynamic API registration
FlagApiExtensions = "apiExtensions"
// FlagGroupByVariable
// Enable groupBy variable support in scenes dashboards
FlagGroupByVariable = "groupByVariable"

View File

@@ -134,6 +134,16 @@ func (c authzLimitedClient) Check(ctx context.Context, id claims.AuthInfo, req c
return claims.CheckResponse{Allowed: true}, nil
}
// Hack, allow creation of Cluster scoped resources (ex. register CRDs)
// We need to make sure it is the correct service account,
// not just any service account.
// This is called when we submit a CRD (not when we list them)
// Currently creating them with a Service Account thus the match
if req.Namespace == "" && claims.IsIdentityType(id.GetIdentityType(), claims.TypeServiceAccount, claims.TypeAccessPolicy) {
span.SetAttributes(attribute.Bool("allowed", true))
return claims.CheckResponse{Allowed: true}, nil
}
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) {
span.SetAttributes(attribute.Bool("allowed", false))
span.SetStatus(codes.Error, "Namespace mismatch")
@@ -183,7 +193,14 @@ func (c authzLimitedClient) Compile(ctx context.Context, id claims.AuthInfo, req
return true
}, claims.NoopZookie{}, nil
}
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) {
// Hack, allow system tokens to be able to
// access Cluster scoped resources (ex. register CRDs)
// We need to make sure it is the correct service account,
// not just any service account
isServiceAccnt := req.Namespace == "" && claims.IsIdentityType(id.GetIdentityType(), claims.TypeAccessPolicy)
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) && !isServiceAccnt {
span.SetAttributes(attribute.Bool("allowed", false))
span.SetStatus(codes.Error, "Namespace mismatch")
span.RecordError(claims.ErrNamespaceMismatch)