mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 03:54:29 +08:00
Compare commits
3 Commits
zoltan/pos
...
apiextensi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e12e47704 | ||
|
|
939170c7d5 | ||
|
|
c0af936bcd |
46
conf/apiextensions.ini
Normal file
46
conf/apiextensions.ini
Normal file
@@ -0,0 +1,46 @@
|
||||
; Run locally unified storage with SQLite to test
|
||||
; new API registration changes
|
||||
app_mode = development
|
||||
target = all
|
||||
|
||||
[log]
|
||||
level = debug
|
||||
|
||||
[server]
|
||||
; HTTPS is required for kubectl (but HTTP works for testing with curl)
|
||||
protocol = https
|
||||
http_port = 1111
|
||||
|
||||
[feature_toggles]
|
||||
; Enable the apiextensions feature
|
||||
apiExtensions = true
|
||||
; Enable unified storage globally
|
||||
unifiedStorage = true
|
||||
; Enable search indexing for unified storage
|
||||
unifiedStorageSearch = true
|
||||
; Enable the grafana-apiserver explicitly
|
||||
grafanaAPIServer = true
|
||||
|
||||
[grafana-apiserver]
|
||||
; Use unified storage backed by SQL (uses your Grafana database)
|
||||
storage_type = unified
|
||||
|
||||
; Configure dashboards to use unified storage
|
||||
[unified_storage.dashboards.dashboard.grafana.app]
|
||||
dualWriterMode = 5
|
||||
|
||||
; Configure folders to use unified storage (required for dashboards)
|
||||
[unified_storage.folders.folder.grafana.app]
|
||||
dualWriterMode = 5
|
||||
|
||||
[database]
|
||||
; SQLite database for testing
|
||||
type = sqlite3
|
||||
path = grafana.db
|
||||
high_availability = false
|
||||
|
||||
; Will only be used for the MT grafana
|
||||
; apiextensions service
|
||||
; [auth.extended_jwt]
|
||||
; enabled = true
|
||||
; jwks_url = "http://localhost:6481/jwks"
|
||||
4
go.mod
4
go.mod
@@ -645,7 +645,7 @@ require (
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
gopkg.in/src-d/go-errors.v1 v1.0.0 // indirect
|
||||
gopkg.in/telebot.v3 v3.3.8 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.34.2 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.34.2
|
||||
k8s.io/kms v0.34.2 // indirect
|
||||
modernc.org/libc v1.66.10 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
@@ -657,6 +657,8 @@ require (
|
||||
|
||||
require github.com/grafana/tempo v1.5.1-0.20250529124718-87c2dc380cec // @grafana/observability-traces-and-profiling
|
||||
|
||||
require github.com/evanphx/json-patch v5.9.11+incompatible
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
|
||||
@@ -159,6 +159,7 @@ var serviceIdentityTokenPermissions = []string{
|
||||
"query.grafana.app:*",
|
||||
"iam.grafana.app:*",
|
||||
"preferences.grafana.app:*",
|
||||
"apiextensions.grafana.app:*",
|
||||
|
||||
// Secrets Manager uses a custom verb for secret decryption, and its authorizer does not allow wildcard permissions.
|
||||
"secret.grafana.app/securevalues:decrypt",
|
||||
|
||||
@@ -131,19 +131,31 @@ func NamespaceKeyFunc(gr schema.GroupResource) func(ctx context.Context, name st
|
||||
}
|
||||
}
|
||||
|
||||
// NoNamespaceKeyFunc is the default function for constructing storage paths
|
||||
// to a resource relative to the given prefix without a namespace.
|
||||
func NoNamespaceKeyFunc(ctx context.Context, prefix string, gr schema.GroupResource, name string) (string, error) {
|
||||
if len(name) == 0 {
|
||||
return "", apierrors.NewBadRequest("Name parameter required.")
|
||||
// ClusterScopedKeyFunc constructs storage paths for cluster-scoped resources (no namespace).
|
||||
func ClusterScopedKeyFunc(gr schema.GroupResource) func(ctx context.Context, name string) (string, error) {
|
||||
return func(ctx context.Context, name string) (string, error) {
|
||||
if len(name) == 0 {
|
||||
return "", apierrors.NewBadRequest("Name parameter required.")
|
||||
}
|
||||
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
|
||||
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
|
||||
}
|
||||
key := &Key{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Name: name,
|
||||
}
|
||||
return key.String(), nil
|
||||
}
|
||||
}
|
||||
|
||||
// ClusterScopedKeyRootFunc is used by the generic registry store for cluster-scoped resources.
|
||||
func ClusterScopedKeyRootFunc(gr schema.GroupResource) func(ctx context.Context) string {
|
||||
return func(ctx context.Context) string {
|
||||
key := &Key{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
}
|
||||
return key.String()
|
||||
}
|
||||
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
|
||||
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
|
||||
}
|
||||
key := &Key{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Name: name,
|
||||
}
|
||||
return prefix + key.String(), nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package generic
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
@@ -12,16 +14,27 @@ func NewRegistryStore(scheme *runtime.Scheme, resourceInfo utils.ResourceInfo, o
|
||||
gv := resourceInfo.GroupVersion()
|
||||
gv.Version = runtime.APIVersionInternal
|
||||
strategy := NewStrategy(scheme, gv)
|
||||
|
||||
gr := resourceInfo.GroupResource()
|
||||
var keyRootFunc func(ctx context.Context) string
|
||||
var keyFunc func(ctx context.Context, name string) (string, error)
|
||||
|
||||
if resourceInfo.IsClusterScoped() {
|
||||
strategy = strategy.WithClusterScope()
|
||||
keyRootFunc = ClusterScopedKeyRootFunc(gr)
|
||||
keyFunc = ClusterScopedKeyFunc(gr)
|
||||
} else {
|
||||
keyRootFunc = KeyRootFunc(gr)
|
||||
keyFunc = NamespaceKeyFunc(gr)
|
||||
}
|
||||
|
||||
store := ®istry.Store{
|
||||
NewFunc: resourceInfo.NewFunc,
|
||||
NewListFunc: resourceInfo.NewListFunc,
|
||||
KeyRootFunc: KeyRootFunc(resourceInfo.GroupResource()),
|
||||
KeyFunc: NamespaceKeyFunc(resourceInfo.GroupResource()),
|
||||
KeyRootFunc: keyRootFunc,
|
||||
KeyFunc: keyFunc,
|
||||
PredicateFunc: Matcher,
|
||||
DefaultQualifiedResource: resourceInfo.GroupResource(),
|
||||
DefaultQualifiedResource: gr,
|
||||
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
|
||||
TableConvertor: resourceInfo.TableConverter(),
|
||||
CreateStrategy: strategy,
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
_ "github.com/blugelabs/bluge"
|
||||
_ "github.com/blugelabs/bluge_segment_api"
|
||||
_ "github.com/crewjam/saml"
|
||||
_ "github.com/docker/go-connections/nat"
|
||||
_ "github.com/go-jose/go-jose/v4"
|
||||
_ "github.com/gobwas/glob"
|
||||
_ "github.com/googleapis/gax-go/v2"
|
||||
@@ -31,7 +30,6 @@ import (
|
||||
_ "github.com/spf13/cobra" // used by the standalone apiserver cli
|
||||
_ "github.com/spyzhov/ajson"
|
||||
_ "github.com/stretchr/testify/require"
|
||||
_ "github.com/testcontainers/testcontainers-go"
|
||||
_ "gocloud.dev/secrets/awskms"
|
||||
_ "gocloud.dev/secrets/azurekeyvault"
|
||||
_ "gocloud.dev/secrets/gcpkms"
|
||||
@@ -56,7 +54,9 @@ import (
|
||||
_ "github.com/grafana/e2e"
|
||||
_ "github.com/grafana/gofpdf"
|
||||
_ "github.com/grafana/gomemcache/memcache"
|
||||
_ "github.com/grafana/tempo/pkg/traceql"
|
||||
|
||||
_ "github.com/grafana/grafana/apps/alerting/alertenrichment/pkg/apis/alertenrichment/v1beta1"
|
||||
_ "github.com/grafana/grafana/apps/scope/pkg/apis/scope/v0alpha1"
|
||||
_ "github.com/grafana/tempo/pkg/traceql"
|
||||
_ "github.com/testcontainers/testcontainers-go"
|
||||
)
|
||||
|
||||
113
pkg/registry/apis/apiextensions/README.md
Normal file
113
pkg/registry/apis/apiextensions/README.md
Normal file
@@ -0,0 +1,113 @@
|
||||
# Grafana CRD Support - POC Testing Guide
|
||||
|
||||
This directory contains example CRD definitions and custom resources for testing the Kubernetes CustomResourceDefinition (CRD) support in Grafana server.
|
||||
|
||||
|
||||
## How to run
|
||||
|
||||
To run just compile Grafana with `make build-go` and then run the service with the `apiextensions.ini` provided under the `conf` folder.
|
||||
|
||||
```bash
|
||||
./bin/darwin-arm64/grafana server --config conf/apiextensions.ini
|
||||
```
|
||||
|
||||
To enable this feature we use the `apiExtensions = true` flag and also have unified storage as our storage backend.
|
||||
|
||||
There is no need for US service to run in ST Grafana, and the database will be a SQLite one.
|
||||
|
||||
## Testing Steps
|
||||
|
||||
### Step 1: Create a CustomResourceDefinition
|
||||
|
||||
Create the example CRD that defines a "Widget" resource:
|
||||
|
||||
```bash
|
||||
kubectl apply -f ./pkg/registry/apis/apiextensions/resources/example-crd.yaml
|
||||
```
|
||||
|
||||
Or use curl after you set a Grafana Service account token:
|
||||
```bash
|
||||
export AUTH_SVC="Authorization: Bearer glsa_<rest of the token>"
|
||||
```
|
||||
|
||||
```bash
|
||||
# From Grafana root
|
||||
curl -k -X POST https://localhost:1111/apis/apiextensions.k8s.io/v1/customresourcedefinitions \
|
||||
-H "$AUTH_SVC" \
|
||||
-H "Content-Type: application/yaml" \
|
||||
--data-binary @$PWD/pkg/registry/apis/apiextensions/resources/example-crd.yaml
|
||||
```
|
||||
|
||||
### Step 2: Verify the CRD was created
|
||||
|
||||
List all CRDs:
|
||||
|
||||
```bash
|
||||
# Be sure to use the generated kube-config file
|
||||
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
|
||||
kubectl get customresourcedefinitions.apiextensions.k8s.io
|
||||
|
||||
# Or with curl:
|
||||
curl -k -X GET https://localhost:1111/apis/apiextensions.k8s.io/v1/customresourcedefinitions \
|
||||
-H "$AUTH_SVC"
|
||||
```
|
||||
|
||||
### Step 3: Create a Custom Resource Instance
|
||||
|
||||
Now that the CRD is registered, create an instance of the Widget resource:
|
||||
|
||||
```bash
|
||||
kubectl apply -f ./pkg/registry/apis/apiextensions/resources/example-widget.yaml
|
||||
```
|
||||
|
||||
Or use curl:
|
||||
|
||||
```bash
|
||||
curl -k -X POST https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets \
|
||||
-H "$AUTH_SVC" \
|
||||
-H "Content-Type: application/yaml" \
|
||||
--data-binary @$PWD/pkg/registry/apis/apiextensions/resources/example-widget.yaml
|
||||
```
|
||||
|
||||
### Step 4: Verify the Custom Resource
|
||||
|
||||
> Note: You can see all the resources in SQLite database called `grafana.db`
|
||||
|
||||
List all widgets:
|
||||
|
||||
```bash
|
||||
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
|
||||
kubectl get widgets -n default
|
||||
|
||||
# Or with curl:
|
||||
curl -k -X GET https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets \
|
||||
-H "$AUTH_SVC" | jq .
|
||||
```
|
||||
|
||||
### Step 5: Update the Custom Resource
|
||||
|
||||
Update the widget's spec:
|
||||
|
||||
```bash
|
||||
KUBECONFIG=$PWD/data/grafana-apiserver/apiserver.kubeconfig \
|
||||
kubectl edit widget my-widget -n default
|
||||
|
||||
# Or with curl (PATCH):
|
||||
curl -X PATCH https://localhost:1111/apis/customcrdtest.grafana.app/v1/namespaces/default/widgets/my-widget \
|
||||
-H "Content-Type: application/merge-patch+json" \
|
||||
-H "$AUTH_SVC" \
|
||||
-d '{"spec":{"replicas":5}}'
|
||||
```
|
||||
|
||||
|
||||
|
||||
## What is left
|
||||
|
||||
- [ ] Support multiple versions of CRDs (example in `discovery.go`)
|
||||
- [ ] Watch new CRDs, so we do not require server restart `dynamic_registry.go`. This is needed for horizontal deployments.
|
||||
- [ ] Support `/status` subresource
|
||||
- [ ] Add tracer and logger and remove `fmt.Print`
|
||||
- [ ] Implement MT setup
|
||||
- [ ] Figure out how to modify storage checks for Cluster scoped resources (when we create a new CRD)
|
||||
- [ ] How to tackle Cluster scoped CRs
|
||||
- [ ] Use the feature flag to start the `apiextensions` service on-demand
|
||||
167
pkg/registry/apis/apiextensions/discovery.go
Normal file
167
pkg/registry/apis/apiextensions/discovery.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// DiscoveryManager manages discovery for dynamically registered custom resources
|
||||
type DiscoveryManager struct {
|
||||
mu sync.RWMutex
|
||||
apiGroups map[string]*metav1.APIGroup // group name -> APIGroup
|
||||
resources map[string]*metav1.APIResourceList // group/version -> APIResourceList
|
||||
}
|
||||
|
||||
// NewDiscoveryManager creates a new discovery manager
|
||||
func NewDiscoveryManager() *DiscoveryManager {
|
||||
return &DiscoveryManager{
|
||||
apiGroups: make(map[string]*metav1.APIGroup),
|
||||
resources: make(map[string]*metav1.APIResourceList),
|
||||
}
|
||||
}
|
||||
|
||||
// AddCustomResource adds a custom resource to the discovery documents
|
||||
func (d *DiscoveryManager) AddCustomResource(crd *apiextensionsv1.CustomResourceDefinition) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
group := crd.Spec.Group
|
||||
// TODO(@konsalex): is this needed to be iterated
|
||||
// or multiple versions will be different crd entries?
|
||||
// Need to test this out
|
||||
version := crd.Spec.Versions[0].Name
|
||||
gvKey := fmt.Sprintf("%s/%s", group, version)
|
||||
|
||||
// Update API Group discovery
|
||||
apiGroup, ok := d.apiGroups[group]
|
||||
if !ok {
|
||||
apiGroup = &metav1.APIGroup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "APIGroup",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
Name: group,
|
||||
Versions: []metav1.GroupVersionForDiscovery{
|
||||
{
|
||||
GroupVersion: fmt.Sprintf("%s/%s", group, version),
|
||||
Version: version,
|
||||
},
|
||||
},
|
||||
PreferredVersion: metav1.GroupVersionForDiscovery{
|
||||
GroupVersion: fmt.Sprintf("%s/%s", group, version),
|
||||
Version: version,
|
||||
},
|
||||
}
|
||||
d.apiGroups[group] = apiGroup
|
||||
}
|
||||
|
||||
// Update API Resource List
|
||||
resourceList, ok := d.resources[gvKey]
|
||||
if !ok {
|
||||
resourceList = &metav1.APIResourceList{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "APIResourceList",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
GroupVersion: fmt.Sprintf("%s/%s", group, version),
|
||||
APIResources: []metav1.APIResource{},
|
||||
}
|
||||
d.resources[gvKey] = resourceList
|
||||
}
|
||||
|
||||
// Add the resource to the list
|
||||
apiResource := metav1.APIResource{
|
||||
Name: crd.Spec.Names.Plural,
|
||||
SingularName: crd.Spec.Names.Singular,
|
||||
Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
|
||||
Kind: crd.Spec.Names.Kind,
|
||||
// TODO(@konsalex): Can this be dynamic ever?
|
||||
// Need to validate
|
||||
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
|
||||
ShortNames: crd.Spec.Names.ShortNames,
|
||||
}
|
||||
|
||||
// Add status subresource if defined
|
||||
if crd.Spec.Versions[0].Subresources != nil && crd.Spec.Versions[0].Subresources.Status != nil {
|
||||
apiResource.Verbs = append(apiResource.Verbs, "update")
|
||||
}
|
||||
|
||||
// Check if resource already exists
|
||||
found := false
|
||||
for i, r := range resourceList.APIResources {
|
||||
if r.Name == apiResource.Name {
|
||||
resourceList.APIResources[i] = apiResource
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
resourceList.APIResources = append(resourceList.APIResources, apiResource)
|
||||
}
|
||||
}
|
||||
|
||||
// GetAPIGroupList returns the list of API groups for /apis discovery
|
||||
func (d *DiscoveryManager) GetAPIGroupList() *metav1.APIGroupList {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
groups := make([]metav1.APIGroup, 0, len(d.apiGroups))
|
||||
for _, group := range d.apiGroups {
|
||||
groups = append(groups, *group)
|
||||
}
|
||||
|
||||
return &metav1.APIGroupList{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "APIGroupList",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
Groups: groups,
|
||||
}
|
||||
}
|
||||
|
||||
// GetAPIGroup returns a specific API group
|
||||
func (d *DiscoveryManager) GetAPIGroup(name string) *metav1.APIGroup {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
return d.apiGroups[name]
|
||||
}
|
||||
|
||||
// GetAPIResourceList returns the resource list for a specific group/version
|
||||
func (d *DiscoveryManager) GetAPIResourceList(gv schema.GroupVersion) *metav1.APIResourceList {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
key := fmt.Sprintf("%s/%s", gv.Group, gv.Version)
|
||||
return d.resources[key]
|
||||
}
|
||||
|
||||
// ServeAPIGroup handles requests to /apis/<group>
|
||||
func (d *DiscoveryManager) ServeAPIGroup(w http.ResponseWriter, req *http.Request, group string) {
|
||||
apiGroup := d.GetAPIGroup(group)
|
||||
if apiGroup == nil {
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(apiGroup)
|
||||
}
|
||||
|
||||
// ServeAPIResourceList handles requests to /apis/<group>/<version>
|
||||
func (d *DiscoveryManager) ServeAPIResourceList(w http.ResponseWriter, req *http.Request, gv schema.GroupVersion) {
|
||||
resourceList := d.GetAPIResourceList(gv)
|
||||
if resourceList == nil {
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(resourceList)
|
||||
}
|
||||
451
pkg/registry/apis/apiextensions/dynamic_handler.go
Normal file
451
pkg/registry/apis/apiextensions/dynamic_handler.go
Normal file
@@ -0,0 +1,451 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
)
|
||||
|
||||
// DynamicCRHandler handles HTTP requests for dynamically registered custom resources
|
||||
type DynamicCRHandler struct {
|
||||
mu sync.RWMutex
|
||||
// Map of group -> version -> resource -> storage
|
||||
storageMap map[string]map[string]map[string]rest.Storage
|
||||
// Map of group -> version -> resource -> scope (Namespaced or Cluster)
|
||||
scopeMap map[string]map[string]map[string]apiextensionsv1.ResourceScope
|
||||
scheme *runtime.Scheme
|
||||
codecs serializer.CodecFactory
|
||||
}
|
||||
|
||||
// NewDynamicCRHandler creates a new dynamic custom resource handler
|
||||
func NewDynamicCRHandler(scheme *runtime.Scheme) *DynamicCRHandler {
|
||||
return &DynamicCRHandler{
|
||||
storageMap: make(map[string]map[string]map[string]rest.Storage),
|
||||
scheme: scheme,
|
||||
codecs: serializer.NewCodecFactory(scheme),
|
||||
}
|
||||
}
|
||||
|
||||
type registeredCR struct {
|
||||
storage rest.Storage
|
||||
scope apiextensionsv1.ResourceScope
|
||||
}
|
||||
|
||||
// RegisterCustomResource registers a custom resource storage for dynamic routing
|
||||
func (h *DynamicCRHandler) RegisterCustomResource(
|
||||
crd *apiextensionsv1.CustomResourceDefinition,
|
||||
version string,
|
||||
storage rest.Storage,
|
||||
) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
group := crd.Spec.Group
|
||||
resource := crd.Spec.Names.Plural
|
||||
|
||||
if h.storageMap[group] == nil {
|
||||
h.storageMap[group] = make(map[string]map[string]rest.Storage)
|
||||
}
|
||||
if h.storageMap[group][version] == nil {
|
||||
h.storageMap[group][version] = make(map[string]rest.Storage)
|
||||
}
|
||||
|
||||
h.storageMap[group][version][resource] = storage
|
||||
|
||||
// Store the scope information
|
||||
if h.scopeMap == nil {
|
||||
h.scopeMap = make(map[string]map[string]map[string]apiextensionsv1.ResourceScope)
|
||||
}
|
||||
if h.scopeMap[group] == nil {
|
||||
h.scopeMap[group] = make(map[string]map[string]apiextensionsv1.ResourceScope)
|
||||
}
|
||||
if h.scopeMap[group][version] == nil {
|
||||
h.scopeMap[group][version] = make(map[string]apiextensionsv1.ResourceScope)
|
||||
}
|
||||
h.scopeMap[group][version][resource] = crd.Spec.Scope
|
||||
|
||||
fmt.Printf("DynamicCRHandler: Registered %s/%s/%s (scope: %s)\n", group, version, resource, crd.Spec.Scope)
|
||||
}
|
||||
|
||||
// ServeHTTP handles HTTP requests for custom resources
|
||||
func (h *DynamicCRHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// We could use a custom regex pattern, but RequestInfo is
|
||||
// already available and gives us the match info for free.
|
||||
requestInfo, ok := request.RequestInfoFrom(req.Context())
|
||||
if !ok || requestInfo == nil {
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
group := requestInfo.APIGroup
|
||||
version := requestInfo.APIVersion
|
||||
namespace := requestInfo.Namespace
|
||||
resource := requestInfo.Resource
|
||||
name := requestInfo.Name
|
||||
|
||||
fmt.Printf("DynamicCRHandler: %s %s (group=%s, version=%s, resource=%s, namespace=%s, name=%s)\n",
|
||||
req.Method, req.URL.Path, group, version, resource, namespace, name)
|
||||
|
||||
// Look up the storage
|
||||
h.mu.RLock()
|
||||
versionMap, groupExists := h.storageMap[group]
|
||||
if !groupExists {
|
||||
h.mu.RUnlock()
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
resourceMap, versionExists := versionMap[version]
|
||||
if !versionExists {
|
||||
h.mu.RUnlock()
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
storage, resourceExists := resourceMap[resource]
|
||||
if !resourceExists {
|
||||
h.mu.RUnlock()
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
// Check scope
|
||||
scope := h.scopeMap[group][version][resource]
|
||||
h.mu.RUnlock()
|
||||
|
||||
// Validate scope matches the request
|
||||
if scope == apiextensionsv1.NamespaceScoped && namespace == "" {
|
||||
http.Error(w, fmt.Sprintf("resource %s is namespace-scoped, must specify namespace", resource), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if scope == apiextensionsv1.ClusterScoped && namespace != "" {
|
||||
http.Error(w, fmt.Sprintf("resource %s is cluster-scoped, cannot specify namespace", resource), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Add request info to context
|
||||
ctx := req.Context()
|
||||
ctx = request.WithNamespace(ctx, namespace)
|
||||
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
|
||||
IsResourceRequest: true,
|
||||
Path: req.URL.Path,
|
||||
Verb: strings.ToLower(req.Method),
|
||||
APIGroup: group,
|
||||
APIVersion: version,
|
||||
Namespace: namespace,
|
||||
Resource: resource,
|
||||
Name: name,
|
||||
})
|
||||
|
||||
// Handle the request based on the storage interface
|
||||
h.handleStorageRequest(ctx, w, req, storage, name, namespace)
|
||||
}
|
||||
|
||||
// handleStorageRequest dispatches to the appropriate storage method
|
||||
func (h *DynamicCRHandler) handleStorageRequest(
|
||||
ctx context.Context,
|
||||
w http.ResponseWriter,
|
||||
req *http.Request,
|
||||
storage rest.Storage,
|
||||
name string,
|
||||
namespace string,
|
||||
) {
|
||||
// Determine media type for response
|
||||
_, serializerInfo, err := negotiation.NegotiateOutputMediaType(req, h.codecs, negotiation.DefaultEndpointRestrictions)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to negotiate media type: %v", err), http.StatusNotAcceptable)
|
||||
return
|
||||
}
|
||||
serializer := serializerInfo.Serializer
|
||||
|
||||
switch req.Method {
|
||||
case http.MethodGet:
|
||||
if name == "" {
|
||||
// List operation
|
||||
if lister, ok := storage.(rest.Lister); ok {
|
||||
h.handleList(ctx, w, req, lister, serializer)
|
||||
} else {
|
||||
http.Error(w, "list not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
} else {
|
||||
// Get operation
|
||||
if getter, ok := storage.(rest.Getter); ok {
|
||||
h.handleGet(ctx, w, req, getter, name, serializer)
|
||||
} else {
|
||||
http.Error(w, "get not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
case http.MethodPost:
|
||||
// Create operation
|
||||
if creater, ok := storage.(rest.Creater); ok {
|
||||
h.handleCreate(ctx, w, req, creater, serializer)
|
||||
} else {
|
||||
http.Error(w, "create not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
case http.MethodPut:
|
||||
// Update operation
|
||||
if updater, ok := storage.(rest.Updater); ok {
|
||||
h.handleUpdate(ctx, w, req, updater, name, serializer)
|
||||
} else {
|
||||
http.Error(w, "update not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
case http.MethodPatch:
|
||||
// Patch operation
|
||||
if patcher, ok := storage.(rest.Patcher); ok {
|
||||
h.handlePatch(ctx, w, req, patcher, name, serializer)
|
||||
} else {
|
||||
http.Error(w, "patch not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
case http.MethodDelete:
|
||||
// Delete operation
|
||||
if deleter, ok := storage.(rest.GracefulDeleter); ok {
|
||||
h.handleDelete(ctx, w, req, deleter, name, serializer)
|
||||
} else {
|
||||
http.Error(w, "delete not supported", http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
default:
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods for each operation
|
||||
func (h *DynamicCRHandler) handleList(ctx context.Context, w http.ResponseWriter, req *http.Request, lister rest.Lister, serializer runtime.Serializer) {
|
||||
// TODO(@konsalex): Parse list options from query parameters
|
||||
obj, err := lister.List(ctx, nil)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to list: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
h.writeResponse(w, obj, serializer)
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) handleGet(ctx context.Context, w http.ResponseWriter, req *http.Request, getter rest.Getter, name string, serializer runtime.Serializer) {
|
||||
// TODO(@konsalex): Parse get options
|
||||
obj, err := getter.Get(ctx, name, nil)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to get: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
h.writeResponse(w, obj, serializer)
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) handleCreate(ctx context.Context, w http.ResponseWriter, req *http.Request, creater rest.Creater, serializer runtime.Serializer) {
|
||||
// Read the request body
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
fmt.Printf("DynamicCRHandler: handleCreate received body: %s\n", string(body))
|
||||
|
||||
// Decode the body
|
||||
decoder := h.codecs.UniversalDeserializer()
|
||||
obj, gvk, err := decoder.Decode(body, nil, nil)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("DynamicCRHandler: Decoded object type: %T, GVK: %v\n", obj, gvk)
|
||||
|
||||
// Ensure we have an unstructured object
|
||||
if obj == nil {
|
||||
http.Error(w, "decoded object is nil", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the object
|
||||
fmt.Printf("DynamicCRHandler: Calling creater.Create...\n")
|
||||
created, err := creater.Create(ctx, obj, nil, nil)
|
||||
if err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Create failed: %v\n", err)
|
||||
http.Error(w, fmt.Sprintf("failed to create: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("DynamicCRHandler: Create succeeded, writing response\n")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
if err := serializer.Encode(created, w); err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) handleUpdate(ctx context.Context, w http.ResponseWriter, req *http.Request, updater rest.Updater, name string, serializer runtime.Serializer) {
|
||||
fmt.Printf("DynamicCRHandler: handleUpdate for resource: %s\n", name)
|
||||
|
||||
// Read the request body
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
// Decode the body
|
||||
decoder := h.codecs.UniversalDeserializer()
|
||||
obj, gvk, err := decoder.Decode(body, nil, nil)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("DynamicCRHandler: Decoded object type: %T, GVK: %v\n", obj, gvk)
|
||||
|
||||
// Create UpdatedObjectInfo
|
||||
objInfo := rest.DefaultUpdatedObjectInfo(obj)
|
||||
|
||||
// Update the object
|
||||
updated, created, err := updater.Update(ctx, name, objInfo, nil, nil, false, nil)
|
||||
if err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Update failed: %v\n", err)
|
||||
if apierrors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("failed to update: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if created {
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
fmt.Printf("DynamicCRHandler: Update succeeded\n")
|
||||
if err := serializer.Encode(updated, w); err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) handlePatch(ctx context.Context, w http.ResponseWriter, req *http.Request, patcher rest.Patcher, name string, serializer runtime.Serializer) {
|
||||
// rest.Patcher is actually just Getter + Updater
|
||||
// We need to get the existing object, apply the patch, then update
|
||||
|
||||
// Read the request body
|
||||
patchBytes, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to read body: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
// Get the existing object to verify it exists
|
||||
_, err = patcher.Get(ctx, name, nil)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("failed to get resource: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Determine patch type from Content-Type header
|
||||
// Using mime here as content type might be like:
|
||||
// "Content-Type: application/json; charset=utf-8"
|
||||
contentType, _, err := mime.ParseMediaType(req.Header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error parsing Content-Type: %s", contentType), http.StatusUnsupportedMediaType)
|
||||
return
|
||||
}
|
||||
|
||||
var patchType types.PatchType
|
||||
|
||||
switch contentType {
|
||||
case string(types.JSONPatchType):
|
||||
patchType = types.JSONPatchType
|
||||
case string(types.MergePatchType):
|
||||
patchType = types.MergePatchType
|
||||
case string(types.StrategicMergePatchType):
|
||||
patchType = types.StrategicMergePatchType
|
||||
case "application/json":
|
||||
// Default to merge patch for plain JSON
|
||||
// We cannot fall back to Strategic as we miss
|
||||
// Go structs for dynamic CRDs
|
||||
patchType = types.MergePatchType
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("unsupported Content-Type: %s", contentType), http.StatusUnsupportedMediaType)
|
||||
return
|
||||
}
|
||||
|
||||
// Apply the patch via our custom storage (which has the Patch method)
|
||||
if storage, ok := patcher.(*customResourceStorage); ok {
|
||||
patched, err := storage.Patch(ctx, name, patchType, patchBytes, nil)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("failed to patch: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if err := serializer.Encode(patched, w); err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
http.Error(w, "patch not supported for this resource type", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) handleDelete(ctx context.Context, w http.ResponseWriter, req *http.Request, deleter rest.GracefulDeleter, name string, serializer runtime.Serializer) {
|
||||
// Parse delete options from query/body
|
||||
deleteOptions := &metav1.DeleteOptions{}
|
||||
|
||||
// Delete the object
|
||||
obj, deleted, err := deleter.Delete(ctx, name, nil, deleteOptions)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("failed to delete: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if deleted {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
// Return 202 Accepted if deletion is pending (e.g., finalizers)
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
if err := serializer.Encode(obj, w); err != nil {
|
||||
fmt.Printf("DynamicCRHandler: Failed to encode response: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DynamicCRHandler) writeResponse(w http.ResponseWriter, obj runtime.Object, serializer runtime.Serializer) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := serializer.Encode(obj, w); err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to encode response: %v", err), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
452
pkg/registry/apis/apiextensions/dynamic_registry.go
Normal file
452
pkg/registry/apis/apiextensions/dynamic_registry.go
Normal file
@@ -0,0 +1,452 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
|
||||
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kube-openapi/pkg/spec3"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
)
|
||||
|
||||
// DynamicRegistry manages the dynamic registration and unregistration of custom resources
|
||||
type DynamicRegistry struct {
|
||||
scheme *runtime.Scheme
|
||||
optsGetter generic.RESTOptionsGetter
|
||||
unifiedClient resource.ResourceClient
|
||||
apiGroupInfo *genericapiserver.APIGroupInfo
|
||||
accessClient authlib.AccessClient
|
||||
server *genericapiserver.GenericAPIServer // API server to install new groups
|
||||
discoveryManager *DiscoveryManager
|
||||
|
||||
mu sync.RWMutex
|
||||
registrations map[string]*customResourceRegistration // key: group/version/resource
|
||||
apiGroups map[string]*genericapiserver.APIGroupInfo // key: group name
|
||||
|
||||
// openAPISpecs caches OpenAPI specs per GroupVersion and CRD Name
|
||||
openAPISpecs map[schema.GroupVersion]map[string]*spec3.OpenAPI
|
||||
}
|
||||
|
||||
type customResourceRegistration struct {
|
||||
crd *apiextensionsv1.CustomResourceDefinition
|
||||
storage *customResourceStorage
|
||||
}
|
||||
|
||||
// NewDynamicRegistry creates a new dynamic registry
|
||||
func NewDynamicRegistry(
|
||||
scheme *runtime.Scheme,
|
||||
optsGetter generic.RESTOptionsGetter,
|
||||
apiGroupInfo *genericapiserver.APIGroupInfo,
|
||||
accessClient authlib.AccessClient,
|
||||
) *DynamicRegistry {
|
||||
return &DynamicRegistry{
|
||||
scheme: scheme,
|
||||
optsGetter: optsGetter,
|
||||
apiGroupInfo: apiGroupInfo,
|
||||
accessClient: accessClient,
|
||||
discoveryManager: NewDiscoveryManager(),
|
||||
registrations: make(map[string]*customResourceRegistration),
|
||||
apiGroups: make(map[string]*genericapiserver.APIGroupInfo),
|
||||
openAPISpecs: make(map[schema.GroupVersion]map[string]*spec3.OpenAPI),
|
||||
}
|
||||
}
|
||||
|
||||
// SetAPIServer sets the API server for dynamic group installation
|
||||
func (r *DynamicRegistry) SetAPIServer(server *genericapiserver.GenericAPIServer) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.server = server
|
||||
|
||||
// Register discovery handlers for any API groups that were registered before we had the server reference
|
||||
for groupName := range r.apiGroups {
|
||||
// Find the version for this group
|
||||
for _, reg := range r.registrations {
|
||||
if reg.crd.Spec.Group == groupName {
|
||||
versionName := reg.crd.Spec.Versions[0].Name
|
||||
r.registerDiscoveryHandlers(groupName, versionName)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the main /apis handler to include our custom groups
|
||||
r.registerAllWithAggregatedDiscovery()
|
||||
|
||||
// Note: OpenAPI registration is deferred to a PostStartHook because
|
||||
// OpenAPIV3VersionedService is only available after PrepareRun()
|
||||
}
|
||||
|
||||
// RegisterOpenAPIForExistingCRDs registers OpenAPI specs for all existing CRDs
|
||||
// This is called from a PostStartHook after the server is fully prepared
|
||||
func (r *DynamicRegistry) RegisterOpenAPIForExistingCRDs() error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Register OpenAPI specs for all CRDs that were registered before the server was fully prepared
|
||||
for _, reg := range r.registrations {
|
||||
if err := r.updateOpenAPISpecLocked(reg.crd); err != nil {
|
||||
fmt.Printf("Warning: failed to update OpenAPI spec for CRD %s: %v\n", reg.crd.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// registerDiscoveryHandlers registers HTTP handlers for discovery endpoints
|
||||
func (r *DynamicRegistry) registerDiscoveryHandlers(group, version string) {
|
||||
if r.server == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Register /apis/<group> handler
|
||||
groupPath := fmt.Sprintf("/apis/%s", group)
|
||||
r.server.Handler.NonGoRestfulMux.HandleFunc(groupPath, func(w http.ResponseWriter, req *http.Request) {
|
||||
r.discoveryManager.ServeAPIGroup(w, req, group)
|
||||
})
|
||||
// DEBUG
|
||||
fmt.Printf("Registered discovery handler: %s\n", groupPath)
|
||||
|
||||
// Register /apis/<group>/<version> handler
|
||||
gv := schema.GroupVersion{Group: group, Version: version}
|
||||
versionPath := fmt.Sprintf("/apis/%s/%s", group, version)
|
||||
r.server.Handler.NonGoRestfulMux.HandleFunc(versionPath, func(w http.ResponseWriter, req *http.Request) {
|
||||
r.discoveryManager.ServeAPIResourceList(w, req, gv)
|
||||
})
|
||||
fmt.Printf("Registered discovery handler: %s\n", versionPath)
|
||||
}
|
||||
|
||||
// registerAllWithAggregatedDiscovery registers all custom groups with the server's aggregated discovery manager
|
||||
func (r *DynamicRegistry) registerAllWithAggregatedDiscovery() {
|
||||
if r.server == nil || r.server.AggregatedDiscoveryGroupManager == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Register each custom group with the discovery manager
|
||||
for _, apiGroup := range r.discoveryManager.apiGroups {
|
||||
for _, gvDiscovery := range apiGroup.Versions {
|
||||
r.registerWithAggregatedDiscovery(apiGroup.Name, gvDiscovery.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// registerWithAggregatedDiscovery registers a specific group version with the aggregated discovery manager
|
||||
func (r *DynamicRegistry) registerWithAggregatedDiscovery(groupName, versionName string) {
|
||||
if r.server == nil || r.server.AggregatedDiscoveryGroupManager == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Get the resources for this version
|
||||
gvKey := fmt.Sprintf("%s/%s", groupName, versionName)
|
||||
resourceList := r.discoveryManager.resources[gvKey]
|
||||
|
||||
if resourceList != nil {
|
||||
// Convert our metav1.APIResourceList to apidiscoveryv2.APIVersionDiscovery
|
||||
apiResources := make([]apidiscoveryv2.APIResourceDiscovery, 0, len(resourceList.APIResources))
|
||||
for _, res := range resourceList.APIResources {
|
||||
apiRes := apidiscoveryv2.APIResourceDiscovery{
|
||||
Resource: res.Name,
|
||||
ResponseKind: &metav1.GroupVersionKind{
|
||||
Group: groupName,
|
||||
Version: versionName,
|
||||
Kind: res.Kind,
|
||||
},
|
||||
Scope: getScopeType(res.Namespaced),
|
||||
ShortNames: res.ShortNames,
|
||||
Verbs: res.Verbs,
|
||||
}
|
||||
apiResources = append(apiResources, apiRes)
|
||||
}
|
||||
|
||||
versionDiscovery := apidiscoveryv2.APIVersionDiscovery{
|
||||
Version: versionName,
|
||||
Resources: apiResources,
|
||||
}
|
||||
|
||||
// Create a manager with CRD source
|
||||
manager := r.server.AggregatedDiscoveryGroupManager.WithSource(aggregated.CRDSource)
|
||||
|
||||
// Add group version
|
||||
manager.AddGroupVersion(
|
||||
groupName,
|
||||
versionDiscovery,
|
||||
)
|
||||
|
||||
// Set priority to ensure it's discoverable
|
||||
gv := metav1.GroupVersion{
|
||||
Group: groupName,
|
||||
Version: versionName,
|
||||
}
|
||||
manager.SetGroupVersionPriority(gv, 1000, 100)
|
||||
|
||||
fmt.Printf("✅ Registered %s with AggregatedDiscoveryGroupManager (Source: CRD)\n", gvKey)
|
||||
}
|
||||
}
|
||||
|
||||
// getScopeType converts boolean namespaced flag to apidiscoveryv2 scope type
|
||||
func getScopeType(namespaced bool) apidiscoveryv2.ResourceScope {
|
||||
if namespaced {
|
||||
return apidiscoveryv2.ScopeNamespace
|
||||
}
|
||||
return apidiscoveryv2.ScopeCluster
|
||||
}
|
||||
|
||||
// Start begins watching CRDs for dynamic registration
|
||||
func (r *DynamicRegistry) Start(ctx context.Context, crdStore *genericregistry.Store) {
|
||||
// Currently we don't watch for changes
|
||||
// CRDs are loaded during UpdateAPIGroupInfo before the server starts
|
||||
// We need to implement proper watching mechanism here.
|
||||
// TODO(@konsalex): Watch for CRD changes and call RegisterCRD/UpdateCRD/UnregisterCRD
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// RegisterCRD registers a custom resource dynamically based on the CRD spec
|
||||
func (r *DynamicRegistry) RegisterCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// TODO(@konsalex): Support multiple versions
|
||||
if len(crd.Spec.Versions) != 1 {
|
||||
return fmt.Errorf("only single-version CRDs are supported")
|
||||
}
|
||||
|
||||
version := crd.Spec.Versions[0]
|
||||
|
||||
// Create storage for the custom resource
|
||||
crStorage, err := NewCustomResourceStorage(
|
||||
crd,
|
||||
version.Name,
|
||||
r.scheme,
|
||||
r.optsGetter,
|
||||
r.accessClient,
|
||||
r.unifiedClient,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create custom resource storage: %w", err)
|
||||
}
|
||||
|
||||
// Get or create API group info for this custom resource's group
|
||||
group := crd.Spec.Group
|
||||
apiGroupInfo, ok := r.apiGroups[group]
|
||||
isNewGroup := !ok
|
||||
if !ok {
|
||||
// Create a simple API group info without using NewDefaultAPIGroupInfo
|
||||
// to avoid OpenAPI validation issues with unstructured types
|
||||
gvInfo := genericapiserver.APIGroupInfo{
|
||||
PrioritizedVersions: []schema.GroupVersion{{Group: group, Version: version.Name}},
|
||||
VersionedResourcesStorageMap: make(map[string]map[string]rest.Storage),
|
||||
Scheme: r.scheme,
|
||||
NegotiatedSerializer: r.apiGroupInfo.NegotiatedSerializer,
|
||||
ParameterCodec: r.apiGroupInfo.ParameterCodec,
|
||||
}
|
||||
|
||||
r.apiGroups[group] = &gvInfo
|
||||
apiGroupInfo = &gvInfo
|
||||
}
|
||||
|
||||
// Get or create the storage map for this version
|
||||
storageMap, ok := apiGroupInfo.VersionedResourcesStorageMap[version.Name]
|
||||
if !ok {
|
||||
storageMap = make(map[string]rest.Storage)
|
||||
apiGroupInfo.VersionedResourcesStorageMap[version.Name] = storageMap
|
||||
|
||||
// Update prioritized versions if this is a new version
|
||||
found := false
|
||||
for _, gv := range apiGroupInfo.PrioritizedVersions {
|
||||
if gv.Version == version.Name {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
apiGroupInfo.PrioritizedVersions = append(apiGroupInfo.PrioritizedVersions,
|
||||
schema.GroupVersion{Group: group, Version: version.Name})
|
||||
}
|
||||
}
|
||||
|
||||
// Register the custom resource storage
|
||||
resourcePath := crd.Spec.Names.Plural
|
||||
storageMap[resourcePath] = crStorage
|
||||
|
||||
// Register status subresource if defined
|
||||
// TODO(@konsalex): Implement status subresource for direct storage
|
||||
// if version.Subresources != nil && version.Subresources.Status != nil {
|
||||
// storageMap[resourcePath+"/status"] = crStorage
|
||||
// }
|
||||
|
||||
// Store the registration
|
||||
key := fmt.Sprintf("%s/%s/%s", crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
|
||||
r.registrations[key] = &customResourceRegistration{
|
||||
crd: crd.DeepCopy(),
|
||||
storage: crStorage,
|
||||
}
|
||||
|
||||
// Add to discovery manager
|
||||
// (this will make kubectl work properly)
|
||||
r.discoveryManager.AddCustomResource(crd)
|
||||
|
||||
// Register discovery HTTP handlers if we have the server
|
||||
if r.server != nil && isNewGroup {
|
||||
r.registerDiscoveryHandlers(group, version.Name)
|
||||
// Also register with aggregated discovery manager
|
||||
r.registerWithAggregatedDiscovery(group, version.Name)
|
||||
}
|
||||
|
||||
// Update OpenAPI spec
|
||||
if err := r.updateOpenAPISpecLocked(crd); err != nil {
|
||||
fmt.Printf("Warning: failed to update OpenAPI spec for CRD %s: %v\n", crd.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateCRD updates a custom resource registration
|
||||
func (r *DynamicRegistry) UpdateCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Hack for now, we just un-register it and the re-register it.
|
||||
// Not sure if there is any drawback
|
||||
if err := r.unregisterCRDLocked(crd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.RegisterCRD(crd)
|
||||
}
|
||||
|
||||
// UnregisterCRD removes a custom resource registration
|
||||
func (r *DynamicRegistry) UnregisterCRD(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
return r.unregisterCRDLocked(crd)
|
||||
}
|
||||
|
||||
func (r *DynamicRegistry) unregisterCRDLocked(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
if len(crd.Spec.Versions) != 1 {
|
||||
return fmt.Errorf("only single-version CRDs are supported")
|
||||
}
|
||||
|
||||
version := crd.Spec.Versions[0]
|
||||
key := fmt.Sprintf("%s/%s/%s", crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
|
||||
|
||||
// Remove from registrations
|
||||
delete(r.registrations, key)
|
||||
|
||||
// Remove from API group info storage map
|
||||
if storageMap, ok := r.apiGroupInfo.VersionedResourcesStorageMap[version.Name]; ok {
|
||||
resourcePath := crd.Spec.Names.Plural
|
||||
delete(storageMap, resourcePath)
|
||||
delete(storageMap, resourcePath+"/status")
|
||||
}
|
||||
|
||||
// Remove from OpenAPI specs
|
||||
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}
|
||||
if specs, ok := r.openAPISpecs[gv]; ok {
|
||||
delete(specs, crd.Name)
|
||||
if len(specs) == 0 {
|
||||
delete(r.openAPISpecs, gv)
|
||||
// Remove from service
|
||||
if r.server != nil && r.server.OpenAPIV3VersionedService != nil {
|
||||
path := fmt.Sprintf("apis/%s/%s", gv.Group, gv.Version)
|
||||
r.server.OpenAPIV3VersionedService.DeleteGroupVersion(path)
|
||||
}
|
||||
} else {
|
||||
// Update with remaining specs
|
||||
if err := r.updateGroupVersionOpenAPILocked(gv); err != nil {
|
||||
return fmt.Errorf("failed to update OpenAPI spec after unregistering CRD: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRegistration returns the registration for a custom resource
|
||||
func (r *DynamicRegistry) GetRegistration(group, version, resource string) (*customResourceRegistration, bool) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
key := fmt.Sprintf("%s/%s/%s", group, version, resource)
|
||||
reg, ok := r.registrations[key]
|
||||
return reg, ok
|
||||
}
|
||||
|
||||
// updateOpenAPISpecLocked builds and updates the OpenAPI spec for the given CRD
|
||||
// mu must be held by caller
|
||||
func (r *DynamicRegistry) updateOpenAPISpecLocked(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
if r.server == nil || r.server.OpenAPIV3VersionedService == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, v := range crd.Spec.Versions {
|
||||
if !v.Served {
|
||||
continue
|
||||
}
|
||||
|
||||
// Build OpenAPI V3 spec
|
||||
spec, err := builder.BuildOpenAPIV3(crd, v.Name, builder.Options{
|
||||
V2: false,
|
||||
IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build OpenAPI V3 spec: %w", err)
|
||||
}
|
||||
|
||||
gv := schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
|
||||
if r.openAPISpecs[gv] == nil {
|
||||
r.openAPISpecs[gv] = make(map[string]*spec3.OpenAPI)
|
||||
}
|
||||
r.openAPISpecs[gv][crd.Name] = spec
|
||||
|
||||
// Update the group version spec in the service
|
||||
if err := r.updateGroupVersionOpenAPILocked(gv); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateGroupVersionOpenAPILocked merges all specs for a GV and updates the service
|
||||
// mu must be held by caller
|
||||
func (r *DynamicRegistry) updateGroupVersionOpenAPILocked(gv schema.GroupVersion) error {
|
||||
if r.server == nil || r.server.OpenAPIV3VersionedService == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
specsMap := r.openAPISpecs[gv]
|
||||
if len(specsMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var specs []*spec3.OpenAPI
|
||||
for _, spec := range specsMap {
|
||||
specs = append(specs, spec)
|
||||
}
|
||||
|
||||
mergedSpec, err := builder.MergeSpecsV3(specs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to merge specs: %w", err)
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("apis/%s/%s", gv.Group, gv.Version)
|
||||
r.server.OpenAPIV3VersionedService.UpdateGroupVersion(path, mergedSpec)
|
||||
|
||||
return nil
|
||||
}
|
||||
309
pkg/registry/apis/apiextensions/register.go
Normal file
309
pkg/registry/apis/apiextensions/register.go
Normal file
@@ -0,0 +1,309 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apiextensionsopenapi "k8s.io/apiextensions-apiserver/pkg/generated/openapi"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/kube-openapi/pkg/common"
|
||||
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanaauthorizer "github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
)
|
||||
|
||||
var _ builder.APIGroupBuilder = (*APIExtensionsBuilder)(nil)
|
||||
|
||||
// APIExtensionsBuilder implements builder.APIGroupBuilder for CustomResourceDefinitions
|
||||
type APIExtensionsBuilder struct {
|
||||
features featuremgmt.FeatureToggles
|
||||
storage *genericregistry.Store
|
||||
accessClient authlib.AccessClient
|
||||
dynamicReg *DynamicRegistry
|
||||
restOptGetter *apistore.RESTOptionsGetter
|
||||
apiregistrar builder.APIRegistrar
|
||||
unifiedClient resource.ResourceClient
|
||||
preloadedCRDs []*apiextensionsv1.CustomResourceDefinition // CRDs loaded before init
|
||||
dynamicHandler *DynamicCRHandler // Dynamic handler for custom resources
|
||||
server *genericapiserver.GenericAPIServer // The running API server
|
||||
}
|
||||
|
||||
// SetAPIServer sets the API server instance
|
||||
// This allows the builder to register dynamic API groups (CRDs)
|
||||
func (b *APIExtensionsBuilder) SetAPIServer(server *genericapiserver.GenericAPIServer) {
|
||||
b.server = server
|
||||
if b.dynamicReg != nil {
|
||||
b.dynamicReg.SetAPIServer(server)
|
||||
}
|
||||
|
||||
// Register a PostStartHook to register OpenAPI specs after the server is fully prepared
|
||||
// This is necessary because OpenAPIV3VersionedService is only available after PrepareRun()
|
||||
server.AddPostStartHookOrDie("apiextensions-openapi", func(context genericapiserver.PostStartHookContext) error {
|
||||
if b.dynamicReg != nil {
|
||||
return b.dynamicReg.RegisterOpenAPIForExistingCRDs()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterAPIService registers the apiextensions API group in single-tenant mode
|
||||
func RegisterAPIService(
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
apiregistration builder.APIRegistrar,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
) (*APIExtensionsBuilder, error) {
|
||||
if !features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
b := &APIExtensionsBuilder{
|
||||
features: features,
|
||||
accessClient: accessClient,
|
||||
apiregistrar: apiregistration,
|
||||
unifiedClient: unified,
|
||||
}
|
||||
|
||||
// Register the CRD API group
|
||||
apiregistration.RegisterAPI(b)
|
||||
|
||||
// NOTE: We can't load CRDs here because unified storage isn't fully initialized yet
|
||||
// We'll use a PostStartHook instead to load CRDs after the server is running
|
||||
// See the postStartHook field and RegisterPostStartHooks method below
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// The default authorizer is fine because authorization happens in storage where we know the parent folder
|
||||
func (b *APIExtensionsBuilder) GetAuthorizer() authorizer.Authorizer {
|
||||
return grafanaauthorizer.NewServiceAuthorizer()
|
||||
}
|
||||
|
||||
// loadAndRegisterCRDsWithDynamicHandler loads CRDs from storage and registers their handlers
|
||||
// This is called during UpdateAPIGroupInfo when storage IS ready
|
||||
func (b *APIExtensionsBuilder) loadAndRegisterCRDsWithDynamicHandler(
|
||||
ctx context.Context,
|
||||
crdStore *genericregistry.Store,
|
||||
opts builder.APIGroupOptions,
|
||||
) error {
|
||||
// TODO(@konsalex): Have a conditional check here for MT
|
||||
// For ST we can use the identity.WithServiceIdentityContext
|
||||
// for MT we can use implicitly use a service token:
|
||||
// https://github.com/grafana/kube-manifests/blob/9f5e409c72fef4f831b173480131121e9cb348a3/flux/dev-us-east-0/grafana-iam/Deployment-iam-grafana-app-main.yaml#L114C9-L114C59
|
||||
// SO we can skip systemCtx creation
|
||||
systemCtx := identity.WithServiceIdentityContext(context.WithoutCancel(ctx), 1)
|
||||
|
||||
// List all CRDs using the initialized storage
|
||||
listObj, err := crdStore.List(systemCtx, &metainternalversion.ListOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list CRDs: %w", err)
|
||||
}
|
||||
|
||||
crdList, ok := listObj.(*apiextensionsv1.CustomResourceDefinitionList)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected list type: %T", listObj)
|
||||
}
|
||||
|
||||
if len(crdList.Items) == 0 {
|
||||
fmt.Println("No existing CRDs found in storage")
|
||||
return nil
|
||||
}
|
||||
|
||||
fmt.Printf("Found %d CRDs in storage, registering with dynamic handler...\n", len(crdList.Items))
|
||||
|
||||
// For each CRD, create storage and register it with the dynamic handler
|
||||
for i := range crdList.Items {
|
||||
crd := &crdList.Items[i]
|
||||
|
||||
fmt.Printf(" - Processing CRD: %s (group: %s, version: %s, resource: %s)\n",
|
||||
crd.Name, crd.Spec.Group, crd.Spec.Versions[0].Name, crd.Spec.Names.Plural)
|
||||
|
||||
// Support only single version for now
|
||||
if len(crd.Spec.Versions) != 1 {
|
||||
fmt.Printf(" Warning: only single-version CRDs supported, skipping %s\n", crd.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
version := crd.Spec.Versions[0]
|
||||
|
||||
// Create storage for this custom resource
|
||||
crStorage, err := NewCustomResourceStorage(
|
||||
crd,
|
||||
version.Name,
|
||||
opts.Scheme,
|
||||
opts.OptsGetter,
|
||||
b.accessClient,
|
||||
b.unifiedClient,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf(" Warning: failed to create storage for CRD %s: %v\n", crd.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Register with the dynamic handler (for HTTP routing)
|
||||
b.dynamicHandler.RegisterCustomResource(crd, version.Name, crStorage)
|
||||
|
||||
// Register with the dynamic registry (for API discovery)
|
||||
fmt.Printf(" DEBUG: Registering CRD with dynamic registry (b.dynamicReg=%v)...\n", b.dynamicReg != nil)
|
||||
if err := b.dynamicReg.RegisterCRD(crd); err != nil {
|
||||
fmt.Printf(" Warning: failed to register CRD with dynamic registry: %v\n", err)
|
||||
} else {
|
||||
fmt.Printf(" ✓ Registered CRD with dynamic registry\n")
|
||||
}
|
||||
|
||||
fmt.Printf(" ✓ Registered with dynamic handler: %s/%s/%s\n",
|
||||
crd.Spec.Group, version.Name, crd.Spec.Names.Plural)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewAPIService creates an APIExtensionsBuilder for multi-tenant mode
|
||||
// TODO(@konsalex): NOT YET IMPLEMENTED properly
|
||||
func NewAPIService(
|
||||
accessClient authlib.AccessClient,
|
||||
unified resource.ResourceClient,
|
||||
registerer prometheus.Registerer,
|
||||
features featuremgmt.FeatureToggles,
|
||||
) (*APIExtensionsBuilder, error) {
|
||||
return &APIExtensionsBuilder{
|
||||
features: features,
|
||||
accessClient: accessClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *APIExtensionsBuilder) GetGroupVersion() schema.GroupVersion {
|
||||
return apiextensionsv1.SchemeGroupVersion
|
||||
}
|
||||
|
||||
func (b *APIExtensionsBuilder) InstallSchema(scheme *runtime.Scheme) error {
|
||||
gv := b.GetGroupVersion()
|
||||
|
||||
// We don't register CRD types with AddKnownTypes here because:
|
||||
// 1. The external k8s.io/apiextensions-apiserver types don't have openapi-gen annotations
|
||||
// 2. This would cause the API server to try to generate OpenAPI specs for them
|
||||
// 3. We register them at the storage level in UpdateAPIGroupInfo instead
|
||||
|
||||
// We only need to add the metav1 types to this group version
|
||||
metav1.AddToGroupVersion(scheme, gv)
|
||||
return scheme.SetVersionPriority(gv)
|
||||
}
|
||||
|
||||
func (b *APIExtensionsBuilder) AllowedV0Alpha1Resources() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDynamicHandler returns the dynamic custom resource handler
|
||||
// This can be used to install it as a fallback handler in the HTTP server
|
||||
func (b *APIExtensionsBuilder) GetDynamicHandler() http.Handler {
|
||||
if b.dynamicHandler == nil {
|
||||
return nil
|
||||
}
|
||||
return b.dynamicHandler
|
||||
}
|
||||
|
||||
func (b *APIExtensionsBuilder) UpdateAPIGroupInfo(
|
||||
apiGroupInfo *genericapiserver.APIGroupInfo,
|
||||
opts builder.APIGroupOptions,
|
||||
) error {
|
||||
// Register storage options for CRDs
|
||||
opts.StorageOptsRegister(
|
||||
schema.GroupResource{Group: apiextensionsv1.GroupName, Resource: "customresourcedefinitions"},
|
||||
apistore.StorageOptions{},
|
||||
)
|
||||
|
||||
// Register the CRD types directly with the scheme now (at storage registration time)
|
||||
// This avoids OpenAPI generation issues during schema installation
|
||||
gv := b.GetGroupVersion()
|
||||
opts.Scheme.AddKnownTypes(gv,
|
||||
&apiextensionsv1.CustomResourceDefinition{},
|
||||
&apiextensionsv1.CustomResourceDefinitionList{},
|
||||
)
|
||||
|
||||
// Create the main CRD storage
|
||||
crdResourceInfo := utils.NewResourceInfo(
|
||||
apiextensionsv1.GroupName,
|
||||
"v1",
|
||||
"customresourcedefinitions",
|
||||
"customresourcedefinition",
|
||||
"CustomResourceDefinition",
|
||||
func() runtime.Object { return &apiextensionsv1.CustomResourceDefinition{} },
|
||||
func() runtime.Object { return &apiextensionsv1.CustomResourceDefinitionList{} },
|
||||
utils.TableColumns{},
|
||||
)
|
||||
crdResourceInfoWithScope := crdResourceInfo.WithClusterScope()
|
||||
|
||||
unified, err := grafanaregistry.NewRegistryStore(
|
||||
opts.Scheme,
|
||||
crdResourceInfoWithScope,
|
||||
opts.OptsGetter,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create CRD storage: %w", err)
|
||||
}
|
||||
|
||||
b.storage = unified
|
||||
b.restOptGetter = opts.OptsGetter.(*apistore.RESTOptionsGetter)
|
||||
|
||||
// Initialize dynamic registry for custom resources
|
||||
b.dynamicReg = NewDynamicRegistry(
|
||||
opts.Scheme,
|
||||
opts.OptsGetter,
|
||||
apiGroupInfo,
|
||||
b.accessClient,
|
||||
)
|
||||
if b.server != nil {
|
||||
b.dynamicReg.SetAPIServer(b.server)
|
||||
}
|
||||
|
||||
// Create the dynamic handler for custom resources
|
||||
b.dynamicHandler = NewDynamicCRHandler(opts.Scheme)
|
||||
|
||||
// NOW storage is ready, load CRDs and register their handlers dynamically
|
||||
if err := b.loadAndRegisterCRDsWithDynamicHandler(context.Background(), unified, opts); err != nil {
|
||||
// TODO(@konsalex): use logger here
|
||||
fmt.Printf("failed to load and register CRDs: %v\n", err)
|
||||
// Don't fail - CRDs can be created later
|
||||
}
|
||||
|
||||
// Start watching CRDs for changes (WIP)
|
||||
go b.dynamicReg.Start(context.Background(), unified)
|
||||
|
||||
storage := map[string]rest.Storage{}
|
||||
storage["customresourcedefinitions"] = &crdStorage{
|
||||
Store: unified,
|
||||
dynamicReg: b.dynamicReg,
|
||||
}
|
||||
storage["customresourcedefinitions/status"] = &crdStatusStorage{Store: unified}
|
||||
|
||||
apiGroupInfo.VersionedResourcesStorageMap[apiextensionsv1.SchemeGroupVersion.Version] = storage
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *APIExtensionsBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions {
|
||||
return func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
|
||||
return apiextensionsopenapi.GetOpenAPIDefinitions(ref)
|
||||
}
|
||||
}
|
||||
69
pkg/registry/apis/apiextensions/resources/example-crd.yaml
Normal file
69
pkg/registry/apis/apiextensions/resources/example-crd.yaml
Normal file
@@ -0,0 +1,69 @@
|
||||
apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
name: widgets.customcrdtest.grafana.app
|
||||
spec:
|
||||
group: customcrdtest.grafana.app
|
||||
names:
|
||||
kind: Widget
|
||||
listKind: WidgetList
|
||||
plural: widgets
|
||||
singular: widget
|
||||
shortNames:
|
||||
- wg
|
||||
scope: Namespaced
|
||||
versions:
|
||||
- name: v1
|
||||
served: true
|
||||
storage: true
|
||||
schema:
|
||||
openAPIV3Schema:
|
||||
type: object
|
||||
properties:
|
||||
apiVersion:
|
||||
type: string
|
||||
kind:
|
||||
type: string
|
||||
metadata:
|
||||
type: object
|
||||
spec:
|
||||
type: object
|
||||
properties:
|
||||
size:
|
||||
type: string
|
||||
enum:
|
||||
- small
|
||||
- medium
|
||||
- large
|
||||
replicas:
|
||||
type: integer
|
||||
minimum: 1
|
||||
maximum: 10
|
||||
required:
|
||||
- size
|
||||
status:
|
||||
type: object
|
||||
properties:
|
||||
ready:
|
||||
type: boolean
|
||||
message:
|
||||
type: string
|
||||
subresources:
|
||||
status: {}
|
||||
additionalPrinterColumns:
|
||||
- name: Size
|
||||
type: string
|
||||
description: The size of the widget
|
||||
jsonPath: .spec.size
|
||||
- name: Replicas
|
||||
type: integer
|
||||
description: Number of replicas
|
||||
jsonPath: .spec.replicas
|
||||
- name: Ready
|
||||
type: boolean
|
||||
description: Is the widget ready?
|
||||
jsonPath: .status.ready
|
||||
- name: Age
|
||||
type: date
|
||||
jsonPath: .metadata.creationTimestamp
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
apiVersion: customcrdtest.grafana.app/v1
|
||||
kind: Widget
|
||||
metadata:
|
||||
name: my-widget
|
||||
namespace: default
|
||||
spec:
|
||||
size: medium
|
||||
replicas: 3
|
||||
111
pkg/registry/apis/apiextensions/resources/outage-crd.yaml
Normal file
111
pkg/registry/apis/apiextensions/resources/outage-crd.yaml
Normal file
@@ -0,0 +1,111 @@
|
||||
apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
name: outages.monitoring.grafana.app
|
||||
spec:
|
||||
group: monitoring.grafana.app
|
||||
names:
|
||||
kind: Outage
|
||||
listKind: OutageList
|
||||
plural: outages
|
||||
singular: outage
|
||||
shortNames:
|
||||
- out
|
||||
scope: Cluster # ← CLUSTER-SCOPED (not namespaced)
|
||||
versions:
|
||||
- name: v1
|
||||
served: true
|
||||
storage: true
|
||||
schema:
|
||||
openAPIV3Schema:
|
||||
type: object
|
||||
properties:
|
||||
apiVersion:
|
||||
type: string
|
||||
kind:
|
||||
type: string
|
||||
metadata:
|
||||
type: object
|
||||
spec:
|
||||
type: object
|
||||
properties:
|
||||
region:
|
||||
type: string
|
||||
description: Geographic region affected by the outage
|
||||
enum:
|
||||
- us-east-1
|
||||
- us-west-2
|
||||
- eu-west-1
|
||||
- eu-central-1
|
||||
- ap-southeast-1
|
||||
- ap-northeast-1
|
||||
severity:
|
||||
type: string
|
||||
description: Severity level of the outage
|
||||
enum:
|
||||
- critical
|
||||
- major
|
||||
- minor
|
||||
default: major
|
||||
affectedServices:
|
||||
type: array
|
||||
description: List of services affected
|
||||
items:
|
||||
type: string
|
||||
startTime:
|
||||
type: string
|
||||
format: date-time
|
||||
description: When the outage started
|
||||
description:
|
||||
type: string
|
||||
description: Description of the outage
|
||||
required:
|
||||
- region
|
||||
- severity
|
||||
- startTime
|
||||
status:
|
||||
type: object
|
||||
properties:
|
||||
resolved:
|
||||
type: boolean
|
||||
description: Whether the outage has been resolved
|
||||
resolvedAt:
|
||||
type: string
|
||||
format: date-time
|
||||
description: When the outage was resolved
|
||||
affectedCustomers:
|
||||
type: integer
|
||||
description: Number of customers affected
|
||||
updates:
|
||||
type: array
|
||||
description: Status updates
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
timestamp:
|
||||
type: string
|
||||
format: date-time
|
||||
message:
|
||||
type: string
|
||||
subresources:
|
||||
status: {}
|
||||
additionalPrinterColumns:
|
||||
- name: Region
|
||||
type: string
|
||||
description: Geographic region
|
||||
jsonPath: .spec.region
|
||||
- name: Severity
|
||||
type: string
|
||||
description: Severity level
|
||||
jsonPath: .spec.severity
|
||||
- name: Resolved
|
||||
type: boolean
|
||||
description: Resolution status
|
||||
jsonPath: .status.resolved
|
||||
- name: Start Time
|
||||
type: date
|
||||
description: When the outage started
|
||||
jsonPath: .spec.startTime
|
||||
- name: Age
|
||||
type: date
|
||||
jsonPath: .metadata.creationTimestamp
|
||||
@@ -0,0 +1,22 @@
|
||||
apiVersion: monitoring.grafana.app/v1
|
||||
kind: Outage
|
||||
metadata:
|
||||
name: outage-2025-11-20-us-east
|
||||
# NO namespace field - this is cluster-scoped!
|
||||
spec:
|
||||
region: us-east-1
|
||||
severity: critical
|
||||
affectedServices:
|
||||
- grafana-cloud-metrics
|
||||
- grafana-cloud-logs
|
||||
- grafana-cloud-traces
|
||||
startTime: "2025-11-20T10:00:00Z"
|
||||
description: "Database connectivity issues affecting multiple services in US East region"
|
||||
status:
|
||||
resolved: false
|
||||
affectedCustomers: 1247
|
||||
updates:
|
||||
- timestamp: "2025-11-20T10:15:00Z"
|
||||
message: "Incident detected, investigating database connectivity"
|
||||
- timestamp: "2025-11-20T10:30:00Z"
|
||||
message: "Root cause identified, applying fix to primary database cluster"
|
||||
176
pkg/registry/apis/apiextensions/storage.go
Normal file
176
pkg/registry/apis/apiextensions/storage.go
Normal file
@@ -0,0 +1,176 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
)
|
||||
|
||||
var _ rest.StandardStorage = (*crdStorage)(nil)
|
||||
var _ rest.Scoper = (*crdStorage)(nil)
|
||||
|
||||
// crdStorage wraps the generic registry store and adds CRD-specific validation and dynamic registration
|
||||
type crdStorage struct {
|
||||
*genericregistry.Store
|
||||
dynamicReg *DynamicRegistry
|
||||
}
|
||||
|
||||
// NamespaceScoped returns false since CRDs are cluster-scoped resources
|
||||
func (s *crdStorage) NamespaceScoped() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Create validates and creates a CRD, then triggers dynamic registration of the custom resource
|
||||
func (s *crdStorage) Create(
|
||||
ctx context.Context,
|
||||
obj runtime.Object,
|
||||
createValidation rest.ValidateObjectFunc,
|
||||
options *metav1.CreateOptions,
|
||||
) (runtime.Object, error) {
|
||||
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
|
||||
if !ok {
|
||||
return nil, apierrors.NewBadRequest("object is not a CustomResourceDefinition")
|
||||
}
|
||||
|
||||
if err := validateCRDForPhase1(crd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the CRD in storage
|
||||
result, err := s.Store.Create(ctx, obj, createValidation, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Register the custom resource dynamically
|
||||
createdCRD, ok := result.(*apiextensionsv1.CustomResourceDefinition)
|
||||
if ok {
|
||||
if err := s.dynamicReg.RegisterCRD(createdCRD); err != nil {
|
||||
// Log error but don't fail the creation
|
||||
// TODO: Add proper logging
|
||||
fmt.Printf("Warning: failed to register CRD dynamically: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Update validates and updates a CRD, then updates the dynamic registration
|
||||
func (s *crdStorage) Update(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
objInfo rest.UpdatedObjectInfo,
|
||||
createValidation rest.ValidateObjectFunc,
|
||||
updateValidation rest.ValidateObjectUpdateFunc,
|
||||
forceAllowCreate bool,
|
||||
options *metav1.UpdateOptions,
|
||||
) (runtime.Object, bool, error) {
|
||||
result, created, err := s.Store.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Update dynamic registration
|
||||
updatedCRD, ok := result.(*apiextensionsv1.CustomResourceDefinition)
|
||||
if ok {
|
||||
if err := s.dynamicReg.UpdateCRD(updatedCRD); err != nil {
|
||||
fmt.Printf("Warning: failed to update CRD registration: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
return result, created, nil
|
||||
}
|
||||
|
||||
// Delete deletes a CRD and unregisters the custom resource
|
||||
func (s *crdStorage) Delete(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
deleteValidation rest.ValidateObjectFunc,
|
||||
options *metav1.DeleteOptions,
|
||||
) (runtime.Object, bool, error) {
|
||||
// Get the CRD before deletion
|
||||
obj, err := s.Store.Get(ctx, name, &metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
|
||||
if !ok {
|
||||
return nil, false, apierrors.NewInternalError(fmt.Errorf("object is not a CRD"))
|
||||
}
|
||||
|
||||
// Delete the CRD
|
||||
result, immediate, err := s.Store.Delete(ctx, name, deleteValidation, options)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Unregister the custom resource
|
||||
if err := s.dynamicReg.UnregisterCRD(crd); err != nil {
|
||||
fmt.Printf("Warning: failed to unregister CRD: %v\n", err)
|
||||
}
|
||||
|
||||
return result, immediate, nil
|
||||
}
|
||||
|
||||
func validateCRDForPhase1(crd *apiextensionsv1.CustomResourceDefinition) error {
|
||||
// TODO(@konsalex): only support single-version CRDs for now
|
||||
if len(crd.Spec.Versions) != 1 {
|
||||
return apierrors.NewBadRequest(
|
||||
fmt.Sprintf("we only support single-version CRDs, got %d versions", len(crd.Spec.Versions)),
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(@konsalex): no webhook conversion for now we will need to support this
|
||||
if crd.Spec.Conversion != nil && crd.Spec.Conversion.Strategy == apiextensionsv1.WebhookConverter {
|
||||
return apierrors.NewBadRequest("no support webhook conversion")
|
||||
}
|
||||
|
||||
// Ensure the single version is marked as both served and storage
|
||||
if len(crd.Spec.Versions) > 0 {
|
||||
version := crd.Spec.Versions[0]
|
||||
if !version.Served {
|
||||
return apierrors.NewBadRequest("the single version must be marked as served")
|
||||
}
|
||||
if !version.Storage {
|
||||
return apierrors.NewBadRequest("the single version must be marked as storage")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// crdStatusStorage handles the status subresource for CRDs
|
||||
type crdStatusStorage struct {
|
||||
*genericregistry.Store
|
||||
}
|
||||
|
||||
func (s *crdStatusStorage) New() runtime.Object {
|
||||
return &apiextensionsv1.CustomResourceDefinition{}
|
||||
}
|
||||
|
||||
func (s *crdStatusStorage) Get(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
options *metav1.GetOptions,
|
||||
) (runtime.Object, error) {
|
||||
return s.Store.Get(ctx, name, options)
|
||||
}
|
||||
|
||||
func (s *crdStatusStorage) Update(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
objInfo rest.UpdatedObjectInfo,
|
||||
createValidation rest.ValidateObjectFunc,
|
||||
updateValidation rest.ValidateObjectUpdateFunc,
|
||||
forceAllowCreate bool,
|
||||
options *metav1.UpdateOptions,
|
||||
) (runtime.Object, bool, error) {
|
||||
return s.Store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
|
||||
}
|
||||
687
pkg/registry/apis/apiextensions/storage_cr.go
Normal file
687
pkg/registry/apis/apiextensions/storage_cr.go
Normal file
@@ -0,0 +1,687 @@
|
||||
package apiextensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/google/uuid"
|
||||
authlib "github.com/grafana/authlib/types"
|
||||
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
var _ rest.StandardStorage = (*customResourceStorage)(nil)
|
||||
var _ rest.Patcher = (*customResourceStorage)(nil)
|
||||
var _ rest.Scoper = (*customResourceStorage)(nil)
|
||||
|
||||
var (
|
||||
internalScheme = runtime.NewScheme()
|
||||
)
|
||||
|
||||
func init() {
|
||||
_ = apiextensionsv1.AddToScheme(internalScheme)
|
||||
_ = apiextensions.AddToScheme(internalScheme)
|
||||
}
|
||||
|
||||
// customResourceStorage implements REST storage for custom resource instances
|
||||
// It bypasses the generic registry store and calls unified storage directly
|
||||
// to properly handle unstructured objects
|
||||
type customResourceStorage struct {
|
||||
storage storage.Interface // Direct unified storage interface
|
||||
crd *apiextensionsv1.CustomResourceDefinition
|
||||
version string
|
||||
gvk schema.GroupVersionKind
|
||||
gvr schema.GroupVersionResource
|
||||
accessClient authlib.AccessClient
|
||||
keyFunc func(ctx context.Context, name string) (string, error)
|
||||
keyRootFunc func(ctx context.Context) string
|
||||
}
|
||||
|
||||
// NewCustomResourceStorage creates storage for a custom resource based on its CRD
|
||||
func NewCustomResourceStorage(
|
||||
crd *apiextensionsv1.CustomResourceDefinition,
|
||||
version string,
|
||||
scheme *runtime.Scheme,
|
||||
optsGetter generic.RESTOptionsGetter,
|
||||
accessClient authlib.AccessClient,
|
||||
unifiedClient resource.ResourceClient,
|
||||
) (*customResourceStorage, error) {
|
||||
gvk := schema.GroupVersionKind{
|
||||
Group: crd.Spec.Group,
|
||||
Version: version,
|
||||
Kind: crd.Spec.Names.Kind,
|
||||
}
|
||||
|
||||
gvr := schema.GroupVersionResource{
|
||||
Group: crd.Spec.Group,
|
||||
Version: version,
|
||||
Resource: crd.Spec.Names.Plural,
|
||||
}
|
||||
|
||||
// Register the custom resource type as unstructured
|
||||
scheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{})
|
||||
listGVK := gvk
|
||||
listGVK.Kind = crd.Spec.Names.ListKind
|
||||
scheme.AddKnownTypeWithName(listGVK, &unstructured.UnstructuredList{})
|
||||
|
||||
// Create resource info for this custom resource
|
||||
resourceInfo := utils.NewResourceInfo(
|
||||
crd.Spec.Group,
|
||||
version,
|
||||
crd.Spec.Names.Plural,
|
||||
crd.Spec.Names.Singular,
|
||||
crd.Spec.Names.Kind,
|
||||
func() runtime.Object {
|
||||
u := &unstructured.Unstructured{}
|
||||
u.SetGroupVersionKind(gvk)
|
||||
return u
|
||||
},
|
||||
func() runtime.Object {
|
||||
ul := &unstructured.UnstructuredList{}
|
||||
ul.SetGroupVersionKind(schema.GroupVersionKind{
|
||||
Group: crd.Spec.Group,
|
||||
Version: version,
|
||||
Kind: crd.Spec.Names.ListKind,
|
||||
})
|
||||
return ul
|
||||
},
|
||||
utils.TableColumns{},
|
||||
)
|
||||
|
||||
// Make it cluster-scoped if needed
|
||||
if crd.Spec.Scope == apiextensionsv1.ClusterScoped {
|
||||
resourceInfo = resourceInfo.WithClusterScope()
|
||||
}
|
||||
|
||||
// Register storage options
|
||||
if restOptGetter, ok := optsGetter.(*apistore.RESTOptionsGetter); ok {
|
||||
restOptGetter.RegisterOptions(
|
||||
gvr.GroupResource(),
|
||||
apistore.StorageOptions{},
|
||||
)
|
||||
}
|
||||
|
||||
// We need this to set the codec below
|
||||
gr := gvr.GroupResource()
|
||||
opts, err := optsGetter.GetRESTOptions(gr, &unstructured.Unstructured{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get REST options: %w", err)
|
||||
}
|
||||
|
||||
// This codec can handle any dynamic type without compile-time registration
|
||||
// Without explicitly defined we get this error:
|
||||
// https://github.com/kubernetes/apimachinery/blob/5a348c53eef0072c40ddf00a45ace423c2790f2a/pkg/runtime/error.go#L52
|
||||
opts.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
|
||||
|
||||
// Get the ConfigForResource
|
||||
config := opts.StorageConfig.ForResource(gr)
|
||||
|
||||
// Create key functions for storage
|
||||
keyFunc := func(obj runtime.Object) (string, error) {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
name := accessor.GetName()
|
||||
ns := accessor.GetNamespace()
|
||||
|
||||
key := &grafanaregistry.Key{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
}
|
||||
return key.String(), nil
|
||||
}
|
||||
|
||||
keyParser := func(key string) (*resourcepb.ResourceKey, error) {
|
||||
k, err := grafanaregistry.ParseKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resourcepb.ResourceKey{
|
||||
Namespace: k.Namespace,
|
||||
Group: k.Group,
|
||||
Resource: k.Resource,
|
||||
Name: k.Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Create the actual storage using apistore.NewStorage
|
||||
underlyingStorage, _, err := apistore.NewStorage(
|
||||
config,
|
||||
unifiedClient,
|
||||
keyFunc,
|
||||
keyParser,
|
||||
func() runtime.Object { return &unstructured.Unstructured{} },
|
||||
func() runtime.Object { return &unstructured.UnstructuredList{} },
|
||||
grafanaregistry.GetAttrs,
|
||||
nil, // trigger
|
||||
nil, // indexers
|
||||
nil, // configProvider
|
||||
apistore.StorageOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create storage: %w", err)
|
||||
}
|
||||
|
||||
return &customResourceStorage{
|
||||
storage: underlyingStorage,
|
||||
crd: crd,
|
||||
version: version,
|
||||
gvk: gvk,
|
||||
gvr: gvr,
|
||||
accessClient: accessClient,
|
||||
keyFunc: grafanaregistry.NamespaceKeyFunc(gr),
|
||||
keyRootFunc: grafanaregistry.KeyRootFunc(gr),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NamespaceScoped returns whether this custom resource is namespaced
|
||||
func (s *customResourceStorage) NamespaceScoped() bool {
|
||||
return s.crd.Spec.Scope == apiextensionsv1.NamespaceScoped
|
||||
}
|
||||
|
||||
// New returns a new instance of the custom resource
|
||||
func (s *customResourceStorage) New() runtime.Object {
|
||||
return &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
// NewList returns a new list instance
|
||||
func (s *customResourceStorage) NewList() runtime.Object {
|
||||
return &unstructured.UnstructuredList{}
|
||||
}
|
||||
|
||||
// getValidator returns a validator for the custom resource
|
||||
func (s *customResourceStorage) getValidator() (validation.SchemaValidator, error) {
|
||||
// Find the version we are serving
|
||||
var version *apiextensionsv1.CustomResourceDefinitionVersion
|
||||
for i := range s.crd.Spec.Versions {
|
||||
v := &s.crd.Spec.Versions[i]
|
||||
if v.Name == s.version {
|
||||
version = v
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If no schema is defined, we can't validate
|
||||
if version == nil || version.Schema == nil || version.Schema.OpenAPIV3Schema == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Convert v1 schema to internal schema
|
||||
internalSchema := &apiextensions.JSONSchemaProps{}
|
||||
if err := internalScheme.Convert(version.Schema.OpenAPIV3Schema, internalSchema, nil); err != nil {
|
||||
return nil, fmt.Errorf("failed to convert schema to internal version: %w", err)
|
||||
}
|
||||
|
||||
// Create validator
|
||||
val, _, err := validation.NewSchemaValidator(internalSchema)
|
||||
return val, err
|
||||
}
|
||||
|
||||
// Create validates and creates a custom resource instance
|
||||
func (s *customResourceStorage) Create(
|
||||
ctx context.Context,
|
||||
obj runtime.Object,
|
||||
createValidation rest.ValidateObjectFunc,
|
||||
options *metav1.CreateOptions,
|
||||
) (runtime.Object, error) {
|
||||
// Ensure the object is unstructured as it is a "custom" CRD
|
||||
u, ok := obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return nil, apierrors.NewBadRequest("object must be unstructured")
|
||||
}
|
||||
|
||||
// Validate GVK matches the CRD
|
||||
// Could it arrive here without a match from storage match failure?
|
||||
objGVK := u.GroupVersionKind()
|
||||
if objGVK.Group != s.gvk.Group || objGVK.Version != s.gvk.Version || objGVK.Kind != s.gvk.Kind {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("object GVK %s does not match CRD GVK %s", objGVK.String(), s.gvk.String()),
|
||||
)
|
||||
}
|
||||
|
||||
// Set defaults if not present
|
||||
if u.GetNamespace() == "" && s.NamespaceScoped() {
|
||||
u.SetNamespace("default")
|
||||
}
|
||||
if u.GetName() == "" {
|
||||
u.SetName(uuid.New().String())
|
||||
}
|
||||
|
||||
// Ensure GVK is set
|
||||
u.SetGroupVersionKind(s.gvk)
|
||||
|
||||
// Run validation if provided
|
||||
if createValidation != nil {
|
||||
if err := createValidation(ctx, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Validate against OpenAPI schema
|
||||
validator, err := s.getValidator()
|
||||
if err != nil {
|
||||
return nil, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
|
||||
}
|
||||
if validator != nil {
|
||||
if errs := validation.ValidateCustomResource(field.NewPath(""), u.UnstructuredContent(), validator); len(errs) > 0 {
|
||||
return nil, apierrors.NewInvalid(s.gvk.GroupKind(), u.GetName(), errs)
|
||||
}
|
||||
}
|
||||
|
||||
// Generate storage key
|
||||
key, err := s.keyFunc(ctx, u.GetName())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate key: %w", err)
|
||||
}
|
||||
|
||||
if options != nil && len(options.DryRun) > 0 {
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// Create the object in storage
|
||||
// TODO(@konsalex): Figure out if this ttl should be (!=0)
|
||||
out := &unstructured.Unstructured{}
|
||||
if err := s.storage.Create(ctx, key, obj, out, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Get retrieves a custom resource instance by name
|
||||
func (s *customResourceStorage) Get(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
options *metav1.GetOptions,
|
||||
) (runtime.Object, error) {
|
||||
key, err := s.keyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := &unstructured.Unstructured{}
|
||||
getOpts := storage.GetOptions{
|
||||
IgnoreNotFound: false,
|
||||
ResourceVersion: "",
|
||||
}
|
||||
if options != nil {
|
||||
getOpts.ResourceVersion = options.ResourceVersion
|
||||
}
|
||||
if err := s.storage.Get(ctx, key, getOpts, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// List retrieves a list of custom resource instances
|
||||
func (s *customResourceStorage) List(
|
||||
ctx context.Context,
|
||||
options *metainternalversion.ListOptions,
|
||||
) (runtime.Object, error) {
|
||||
// Create an empty list to populate
|
||||
listObj := &unstructured.UnstructuredList{}
|
||||
listObj.SetGroupVersionKind(schema.GroupVersionKind{
|
||||
Group: s.gvk.Group,
|
||||
Version: s.gvk.Version,
|
||||
Kind: s.crd.Spec.Names.ListKind,
|
||||
})
|
||||
|
||||
// Handle nil options
|
||||
if options == nil {
|
||||
options = &metainternalversion.ListOptions{}
|
||||
}
|
||||
|
||||
// Convert metainternalversion.ListOptions to storage.ListOptions
|
||||
listOpts := storage.ListOptions{
|
||||
ResourceVersion: options.ResourceVersion,
|
||||
ResourceVersionMatch: options.ResourceVersionMatch,
|
||||
Predicate: storage.Everything,
|
||||
}
|
||||
|
||||
if options.LabelSelector != nil {
|
||||
listOpts.Predicate.Label = options.LabelSelector
|
||||
}
|
||||
if options.FieldSelector != nil {
|
||||
listOpts.Predicate.Field = options.FieldSelector
|
||||
}
|
||||
|
||||
// Get the key prefix for listing
|
||||
keyPrefix := s.keyRootFunc(ctx)
|
||||
|
||||
// Call the underlying storage List/GetList
|
||||
if err := s.storage.GetList(ctx, keyPrefix, listOpts, listObj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return listObj, nil
|
||||
}
|
||||
|
||||
// Delete removes a custom resource instance
|
||||
func (s *customResourceStorage) Delete(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
deleteValidation rest.ValidateObjectFunc,
|
||||
options *metav1.DeleteOptions,
|
||||
) (runtime.Object, bool, error) {
|
||||
key, err := s.keyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
out := &unstructured.Unstructured{}
|
||||
preconditions := &storage.Preconditions{}
|
||||
if options != nil && options.Preconditions != nil {
|
||||
if options.Preconditions.UID != nil {
|
||||
preconditions.UID = options.Preconditions.UID
|
||||
}
|
||||
if options.Preconditions.ResourceVersion != nil {
|
||||
preconditions.ResourceVersion = options.Preconditions.ResourceVersion
|
||||
}
|
||||
}
|
||||
|
||||
validateFunc := func(ctx context.Context, obj runtime.Object) error {
|
||||
if deleteValidation != nil {
|
||||
return deleteValidation(ctx, obj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if options != nil && len(options.DryRun) > 0 {
|
||||
// We need to fetch it to make sure it exists and to return it
|
||||
if err := s.storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if err := validateFunc(ctx, out); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return out, true, nil
|
||||
}
|
||||
|
||||
if err := s.storage.Delete(ctx, key, out, preconditions, validateFunc, out, storage.DeleteOptions{}); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return out, true, nil
|
||||
}
|
||||
|
||||
// Update updates a custom resource instance
|
||||
func (s *customResourceStorage) Update(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
objInfo rest.UpdatedObjectInfo,
|
||||
createValidation rest.ValidateObjectFunc,
|
||||
updateValidation rest.ValidateObjectUpdateFunc,
|
||||
forceAllowCreate bool,
|
||||
options *metav1.UpdateOptions,
|
||||
) (runtime.Object, bool, error) {
|
||||
key, err := s.keyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Get the existing object
|
||||
existingObj := &unstructured.Unstructured{}
|
||||
existingObj.SetGroupVersionKind(s.gvk)
|
||||
|
||||
err = s.storage.Get(ctx, key, storage.GetOptions{}, existingObj)
|
||||
if err != nil {
|
||||
if storage.IsNotFound(err) {
|
||||
if !forceAllowCreate {
|
||||
return nil, false, apierrors.NewNotFound(s.gvr.GroupResource(), name)
|
||||
}
|
||||
// If forceAllowCreate is true, treat as a create
|
||||
// and not as an update
|
||||
newObj, err := objInfo.UpdatedObject(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if createValidation != nil {
|
||||
if err := createValidation(ctx, newObj); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Call Create internally
|
||||
created, err := s.Create(ctx, newObj, nil, nil)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return created, true, nil
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Get the updated object
|
||||
updatedObj, err := objInfo.UpdatedObject(ctx, existingObj)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Validate the update
|
||||
if updateValidation != nil {
|
||||
if err := updateValidation(ctx, updatedObj, existingObj); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure it's an unstructured object
|
||||
updatedUnstructured, ok := updatedObj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return nil, false, fmt.Errorf("updated object is not unstructured")
|
||||
}
|
||||
|
||||
// Set the GVK
|
||||
updatedUnstructured.SetGroupVersionKind(s.gvk)
|
||||
|
||||
// Ensure namespace and name are set correctly
|
||||
if s.NamespaceScoped() {
|
||||
ns := updatedUnstructured.GetNamespace()
|
||||
if ns == "" {
|
||||
updatedUnstructured.SetNamespace("default")
|
||||
}
|
||||
}
|
||||
updatedUnstructured.SetName(name)
|
||||
|
||||
// Validate against OpenAPI schema
|
||||
validator, err := s.getValidator()
|
||||
if err != nil {
|
||||
return nil, false, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
|
||||
}
|
||||
if validator != nil {
|
||||
// We need the old object as interface{}
|
||||
if errs := validation.ValidateCustomResourceUpdate(field.NewPath(""), updatedUnstructured.UnstructuredContent(), existingObj.UnstructuredContent(), validator); len(errs) > 0 {
|
||||
return nil, false, apierrors.NewInvalid(s.gvk.GroupKind(), name, errs)
|
||||
}
|
||||
}
|
||||
|
||||
// Use GuaranteedUpdate for optimistic concurrency control
|
||||
out := &unstructured.Unstructured{}
|
||||
out.SetGroupVersionKind(s.gvk)
|
||||
|
||||
updateFunc := func(input runtime.Object, respMeta storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
// Return the updated object
|
||||
return updatedUnstructured, nil, nil
|
||||
}
|
||||
|
||||
if options != nil && len(options.DryRun) > 0 {
|
||||
return updatedUnstructured, false, nil
|
||||
}
|
||||
|
||||
preconditions := &storage.Preconditions{}
|
||||
|
||||
// We explicitly ignore the not-found, as we checked before proceeding
|
||||
err = s.storage.GuaranteedUpdate(ctx, key, out, true, preconditions, updateFunc, out)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return out, false, nil
|
||||
}
|
||||
|
||||
// Patch patches a custom resource instance
|
||||
func (s *customResourceStorage) Patch(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
patchType types.PatchType,
|
||||
patchBytes []byte,
|
||||
options *metav1.PatchOptions,
|
||||
subresources ...string,
|
||||
) (runtime.Object, error) {
|
||||
key, err := s.keyFunc(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
existingObj := &unstructured.Unstructured{}
|
||||
existingObj.SetGroupVersionKind(s.gvk)
|
||||
|
||||
err = s.storage.Get(ctx, key, storage.GetOptions{}, existingObj)
|
||||
if err != nil {
|
||||
if storage.IsNotFound(err) {
|
||||
return nil, apierrors.NewNotFound(s.gvr.GroupResource(), name)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
existingJSON, err := json.Marshal(existingObj.Object)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal existing object: %w", err)
|
||||
}
|
||||
|
||||
var patchedJSON []byte
|
||||
|
||||
switch patchType {
|
||||
case types.JSONPatchType:
|
||||
patch, err := jsonpatch.DecodePatch(patchBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode JSON patch: %w", err)
|
||||
}
|
||||
patchedJSON, err = patch.Apply(existingJSON)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply JSON patch: %w", err)
|
||||
}
|
||||
|
||||
case types.MergePatchType:
|
||||
patchedJSON, err = jsonpatch.MergePatch(existingJSON, patchBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply merge patch: %w", err)
|
||||
}
|
||||
|
||||
case types.StrategicMergePatchType:
|
||||
// Kubernetes Strategic Merge Patch
|
||||
// For unstructured objects, strategic merge patch behaves like merge patch
|
||||
// because we don't have a Go struct to define merge strategies
|
||||
// TODO(@konsalex): Clarify is we need to even support this by falling-back to merge patch, or just return an error to inform clients
|
||||
// patchedJSON, err = strategicpatch.StrategicMergePatch(existingJSON, patchBytes, &unstructured.Unstructured{})
|
||||
|
||||
// Fallback to merge patch if strategic merge fails
|
||||
patchedJSON, err = jsonpatch.MergePatch(existingJSON, patchBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply patch: %w", err)
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported patch type: %s", patchType)
|
||||
}
|
||||
|
||||
// Unmarshal the patched JSON into an unstructured object
|
||||
patchedObj := &unstructured.Unstructured{}
|
||||
if err := json.Unmarshal(patchedJSON, &patchedObj.Object); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal patched object: %w", err)
|
||||
}
|
||||
|
||||
// Set the GVK
|
||||
patchedObj.SetGroupVersionKind(s.gvk)
|
||||
|
||||
// Ensure namespace and name are set correctly
|
||||
if s.NamespaceScoped() {
|
||||
ns := patchedObj.GetNamespace()
|
||||
if ns == "" {
|
||||
patchedObj.SetNamespace(existingObj.GetNamespace())
|
||||
}
|
||||
}
|
||||
patchedObj.SetName(name)
|
||||
|
||||
// Validate against OpenAPI schema
|
||||
validator, err := s.getValidator()
|
||||
if err != nil {
|
||||
return nil, apierrors.NewInternalError(fmt.Errorf("failed to get validator: %v", err))
|
||||
}
|
||||
if validator != nil {
|
||||
// We need the old object as interface{}
|
||||
if errs := validation.ValidateCustomResourceUpdate(field.NewPath(""), patchedObj.UnstructuredContent(), existingObj.UnstructuredContent(), validator); len(errs) > 0 {
|
||||
return nil, apierrors.NewInvalid(s.gvk.GroupKind(), name, errs)
|
||||
}
|
||||
}
|
||||
|
||||
// Use GuaranteedUpdate to save the patched object
|
||||
out := &unstructured.Unstructured{}
|
||||
out.SetGroupVersionKind(s.gvk)
|
||||
|
||||
updateFunc := func(input runtime.Object, respMeta storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
return patchedObj, nil, nil
|
||||
}
|
||||
|
||||
preconditions := &storage.Preconditions{}
|
||||
if options != nil && options.DryRun != nil && len(options.DryRun) > 0 {
|
||||
fmt.Printf(" - Dry run mode\n")
|
||||
}
|
||||
|
||||
err = s.storage.GuaranteedUpdate(ctx, key, out, true, preconditions, updateFunc, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// DeleteCollection deletes a collection of custom resources
|
||||
func (s *customResourceStorage) DeleteCollection(
|
||||
ctx context.Context,
|
||||
deleteValidation rest.ValidateObjectFunc,
|
||||
options *metav1.DeleteOptions,
|
||||
listOptions *metainternalversion.ListOptions,
|
||||
) (runtime.Object, error) {
|
||||
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "deletecollection")
|
||||
}
|
||||
|
||||
// Watch returns a watch interface for custom resources
|
||||
func (s *customResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
|
||||
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "watch")
|
||||
}
|
||||
|
||||
// ConvertToTable converts to a table for kubectl
|
||||
func (s *customResourceStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
|
||||
return nil, apierrors.NewMethodNotSupported(s.gvr.GroupResource(), "table")
|
||||
}
|
||||
|
||||
// Destroy cleans up resources
|
||||
func (s *customResourceStorage) Destroy() {
|
||||
// Nothing to clean up for direct storage
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package apiregistry
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
|
||||
dashboardinternal "github.com/grafana/grafana/pkg/registry/apis/dashboard"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/datasource"
|
||||
@@ -19,6 +20,7 @@ type Service struct{}
|
||||
// ProvideRegistryServiceSink is an entry point for each service that will force initialization
|
||||
// and give each builder the chance to register itself with the main server
|
||||
func ProvideRegistryServiceSink(
|
||||
_ *apiextensions.APIExtensionsBuilder,
|
||||
_ *dashboardinternal.DashboardsAPIBuilder,
|
||||
_ *dashboardsnapshot.SnapshotsAPIBuilder,
|
||||
_ *datasource.DataSourceAPIBuilder,
|
||||
|
||||
@@ -3,6 +3,7 @@ package apiregistry
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
|
||||
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
|
||||
dashboardinternal "github.com/grafana/grafana/pkg/registry/apis/dashboard"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/datasource"
|
||||
@@ -54,6 +55,7 @@ var WireSet = wire.NewSet(
|
||||
provisioningExtras,
|
||||
|
||||
// Each must be added here *and* in the ServiceSink above
|
||||
apiextensions.RegisterAPIService,
|
||||
dashboardinternal.RegisterAPIService,
|
||||
dashboardsnapshot.RegisterAPIService,
|
||||
datasource.RegisterAPIService,
|
||||
|
||||
13
pkg/server/wire_gen.go
generated
13
pkg/server/wire_gen.go
generated
@@ -48,6 +48,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/pluginscdn"
|
||||
"github.com/grafana/grafana/pkg/plugins/repo"
|
||||
"github.com/grafana/grafana/pkg/registry/apis"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/apiextensions"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboard"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacy"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboardsnapshot"
|
||||
@@ -848,6 +849,10 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
||||
identitySynchronizer := authnimpl.ProvideIdentitySynchronizer(authnimplService)
|
||||
ldapImpl := service12.ProvideService(cfg, featureToggles, ssosettingsimplService)
|
||||
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
|
||||
apiExtensionsBuilder, err := apiextensions.RegisterAPIService(cfg, featureToggles, apiserverService, accessClient, registerer, resourceClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService, libraryElementService, publicDashboardServiceImpl)
|
||||
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer, sourcesService)
|
||||
@@ -900,7 +905,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apiregistryService := apiregistry.ProvideRegistryServiceSink(dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
|
||||
apiregistryService := apiregistry.ProvideRegistryServiceSink(apiExtensionsBuilder, dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
|
||||
teamPermissionsService, err := ossaccesscontrol.ProvideTeamPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, acimplService, teamService, userService, actionSetService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -1489,6 +1494,10 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
||||
identitySynchronizer := authnimpl.ProvideIdentitySynchronizer(authnimplService)
|
||||
ldapImpl := service12.ProvideService(cfg, featureToggles, ssosettingsimplService)
|
||||
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
|
||||
apiExtensionsBuilder, err := apiextensions.RegisterAPIService(cfg, featureToggles, apiserverService, accessClient, registerer, resourceClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService, libraryElementService, publicDashboardServiceImpl)
|
||||
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer, sourcesService)
|
||||
@@ -1541,7 +1550,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apiregistryService := apiregistry.ProvideRegistryServiceSink(dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
|
||||
apiregistryService := apiregistry.ProvideRegistryServiceSink(apiExtensionsBuilder, dashboardsAPIBuilder, snapshotsAPIBuilder, dataSourceAPIBuilder, folderAPIBuilder, identityAccessManagementAPIBuilder, queryAPIBuilder, userStorageAPIBuilder, apiBuilder, provisioningAPIBuilder, ofrepAPIBuilder, dependencyRegisterer, provisioningDependencyRegisterer)
|
||||
teamPermissionsService, err := ossaccesscontrol.ProvideTeamPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, acimplService, teamService, userService, actionSetService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
73
pkg/services/apiserver/dynamic.go
Normal file
73
pkg/services/apiserver/dynamic.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
)
|
||||
|
||||
// createDynamicHandlerWrapper wraps the not-found handler with dynamic custom resource handlers
|
||||
func (s *service) createDynamicHandlerWrapper(builders []builder.APIGroupBuilder, notFoundHandler http.Handler) http.Handler {
|
||||
// Look for the APIExtensionsBuilder - we'll fetch the handler lazily on each request
|
||||
// because the handler is created during UpdateAPIGroupInfo which happens AFTER this wrapper is installed
|
||||
type dynamicHandlerProvider interface {
|
||||
GetDynamicHandler() http.Handler
|
||||
}
|
||||
|
||||
var dhProvider dynamicHandlerProvider
|
||||
for _, b := range builders {
|
||||
if provider, ok := b.(dynamicHandlerProvider); ok {
|
||||
dhProvider = provider
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if dhProvider == nil {
|
||||
return notFoundHandler
|
||||
}
|
||||
|
||||
// Return a wrapper that lazily fetches and tries the dynamic handler on each request
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Check if this is an /apis/ request that might be for a custom resource
|
||||
if strings.HasPrefix(r.URL.Path, "/apis/") {
|
||||
// Fetch the dynamic handler (it will be nil until UpdateAPIGroupInfo creates it)
|
||||
dynamicHandler := dhProvider.GetDynamicHandler()
|
||||
if dynamicHandler != nil {
|
||||
// Create a response recorder to capture what the dynamic handler does
|
||||
recorder := &responseRecorder{
|
||||
ResponseWriter: w,
|
||||
statusCode: 0,
|
||||
}
|
||||
|
||||
dynamicHandler.ServeHTTP(recorder, r)
|
||||
|
||||
// If the dynamic handler handled it (didn't return 404), we're done
|
||||
if recorder.statusCode != 0 && recorder.statusCode != http.StatusNotFound {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, fall back to the not-found handler
|
||||
notFoundHandler.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// responseRecorder captures the status code from a handler
|
||||
type responseRecorder struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (r *responseRecorder) WriteHeader(statusCode int) {
|
||||
r.statusCode = statusCode
|
||||
r.ResponseWriter.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func (r *responseRecorder) Write(b []byte) (int, error) {
|
||||
if r.statusCode == 0 {
|
||||
r.statusCode = http.StatusOK
|
||||
}
|
||||
return r.ResponseWriter.Write(b)
|
||||
}
|
||||
@@ -374,16 +374,36 @@ func (s *service) start(ctx context.Context) error {
|
||||
|
||||
notFoundHandler := notfoundhandler.New(s.codecs, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
|
||||
|
||||
var finalHandler http.Handler = notFoundHandler
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
if s.features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
|
||||
// Wrap the not-found handler with dynamic custom resource handler
|
||||
finalHandler = s.createDynamicHandlerWrapper(builders, notFoundHandler)
|
||||
}
|
||||
|
||||
if err := appinstaller.RegisterPostStartHooks(s.appInstallers, serverConfig); err != nil {
|
||||
return fmt.Errorf("failed to register post start hooks for app installers: %w", err)
|
||||
}
|
||||
|
||||
// Create the server
|
||||
server, err := serverConfig.Complete().New("grafana-apiserver", genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
||||
server, err := serverConfig.Complete().New("grafana-apiserver", genericapiserver.NewEmptyDelegateWithCustomHandler(finalHandler))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//nolint:staticcheck // not yet migrated to OpenFeature
|
||||
if s.features.IsEnabledGlobally(featuremgmt.FlagApiExtensions) {
|
||||
// Inject the server instance into any builders that need it (e.g., APIExtensionsBuilder for dynamic CRD registration)
|
||||
type apiServerSetter interface {
|
||||
SetAPIServer(server *genericapiserver.GenericAPIServer)
|
||||
}
|
||||
for _, b := range builders {
|
||||
if setter, ok := b.(apiServerSetter); ok {
|
||||
setter.SetAPIServer(server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Install the API group+version for existing builders
|
||||
err = builder.InstallAPIs(s.scheme, s.codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions,
|
||||
s.metrics,
|
||||
|
||||
@@ -830,6 +830,13 @@ var (
|
||||
Owner: grafanaAppPlatformSquad,
|
||||
RequiresRestart: true,
|
||||
},
|
||||
{
|
||||
Name: "apiExtensions",
|
||||
Description: "Enable Kubernetes CustomResourceDefinition (CRD) support with dynamic API registration",
|
||||
Stage: FeatureStageExperimental,
|
||||
Owner: grafanaAppPlatformSquad,
|
||||
RequiresRestart: true,
|
||||
},
|
||||
{
|
||||
Name: "groupByVariable",
|
||||
Description: "Enable groupBy variable support in scenes dashboards",
|
||||
|
||||
4
pkg/services/featuremgmt/toggles_gen.go
generated
4
pkg/services/featuremgmt/toggles_gen.go
generated
@@ -447,6 +447,10 @@ const (
|
||||
// Enable CAP token based authentication in grafana's embedded kube-aggregator
|
||||
FlagKubernetesAggregatorCapTokenAuth = "kubernetesAggregatorCapTokenAuth"
|
||||
|
||||
// FlagApiExtensions
|
||||
// Enable Kubernetes CustomResourceDefinition (CRD) support with dynamic API registration
|
||||
FlagApiExtensions = "apiExtensions"
|
||||
|
||||
// FlagGroupByVariable
|
||||
// Enable groupBy variable support in scenes dashboards
|
||||
FlagGroupByVariable = "groupByVariable"
|
||||
|
||||
@@ -134,6 +134,16 @@ func (c authzLimitedClient) Check(ctx context.Context, id claims.AuthInfo, req c
|
||||
return claims.CheckResponse{Allowed: true}, nil
|
||||
}
|
||||
|
||||
// Hack, allow creation of Cluster scoped resources (ex. register CRDs)
|
||||
// We need to make sure it is the correct service account,
|
||||
// not just any service account.
|
||||
// This is called when we submit a CRD (not when we list them)
|
||||
// Currently creating them with a Service Account thus the match
|
||||
if req.Namespace == "" && claims.IsIdentityType(id.GetIdentityType(), claims.TypeServiceAccount, claims.TypeAccessPolicy) {
|
||||
span.SetAttributes(attribute.Bool("allowed", true))
|
||||
return claims.CheckResponse{Allowed: true}, nil
|
||||
}
|
||||
|
||||
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) {
|
||||
span.SetAttributes(attribute.Bool("allowed", false))
|
||||
span.SetStatus(codes.Error, "Namespace mismatch")
|
||||
@@ -183,7 +193,14 @@ func (c authzLimitedClient) Compile(ctx context.Context, id claims.AuthInfo, req
|
||||
return true
|
||||
}, claims.NoopZookie{}, nil
|
||||
}
|
||||
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) {
|
||||
|
||||
// Hack, allow system tokens to be able to
|
||||
// access Cluster scoped resources (ex. register CRDs)
|
||||
// We need to make sure it is the correct service account,
|
||||
// not just any service account
|
||||
isServiceAccnt := req.Namespace == "" && claims.IsIdentityType(id.GetIdentityType(), claims.TypeAccessPolicy)
|
||||
|
||||
if !claims.NamespaceMatches(id.GetNamespace(), req.Namespace) && !isServiceAccnt {
|
||||
span.SetAttributes(attribute.Bool("allowed", false))
|
||||
span.SetStatus(codes.Error, "Namespace mismatch")
|
||||
span.RecordError(claims.ErrNamespaceMismatch)
|
||||
|
||||
Reference in New Issue
Block a user