Compare commits

...

5 Commits

Author SHA1 Message Date
Gareth Dawson
50258ac75f Merge branch 'main' into gareth/opentsdb-backend-migration-2 2025-12-03 23:01:27 +09:00
Gareth Dawson
42c5797beb update backend to support all query options 2025-12-03 21:02:19 +09:00
Gareth Dawson
488486cc02 add tests 2025-11-18 19:37:32 +09:00
Gareth Dawson
69f9637751 move health check to backend 2025-11-18 18:26:43 +09:00
Gareth Dawson
6f9f201192 add feature toggle 2025-11-18 18:06:39 +09:00
11 changed files with 237 additions and 10 deletions

View File

@@ -68,6 +68,7 @@ Most [generally available](https://grafana.com/docs/release-life-cycle/#general-
| `tabularNumbers` | Use fixed-width numbers globally in the UI | |
| `azureResourcePickerUpdates` | Enables the updated Azure Monitor resource picker | Yes |
| `tempoSearchBackendMigration` | Run search queries through the tempo backend | |
| `opentsdbBackendMigration` | Run queries through the data source backend | |
## Public preview feature toggles

View File

@@ -1190,6 +1190,11 @@ export interface FeatureToggles {
*/
transformationsEmptyPlaceholder?: boolean;
/**
* Run queries through the data source backend
* @default false
*/
opentsdbBackendMigration?: boolean;
/**
* Enable TTL plugin instance manager
*/
ttlPluginInstanceManager?: boolean;

View File

@@ -1961,6 +1961,15 @@ var (
FrontendOnly: true,
Owner: grafanaDataProSquad,
},
{
Name: "opentsdbBackendMigration",
Description: "Run queries through the data source backend",
Stage: FeatureStageGeneralAvailability,
Owner: grafanaOSSBigTent,
Expression: "false",
RequiresRestart: true,
},
{
Name: "ttlPluginInstanceManager",
Description: "Enable TTL plugin instance manager",

View File

@@ -266,6 +266,7 @@ panelTimeSettings,experimental,@grafana/dashboards-squad,false,false,false
kubernetesAnnotations,experimental,@grafana/grafana-backend-services-squad,false,false,false
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
opentsdbBackendMigration,GA,@grafana/oss-big-tent,false,true,false
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
lokiQueryLimitsContext,experimental,@grafana/observability-logs,false,false,true
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
266 kubernetesAnnotations experimental @grafana/grafana-backend-services-squad false false false
267 awsDatasourcesHttpProxy experimental @grafana/aws-datasources false false false
268 transformationsEmptyPlaceholder preview @grafana/datapro false false true
269 opentsdbBackendMigration GA @grafana/oss-big-tent false true false
270 ttlPluginInstanceManager experimental @grafana/plugins-platform-backend false false true
271 lokiQueryLimitsContext experimental @grafana/observability-logs false false true
272 rudderstackUpgrade experimental @grafana/grafana-frontend-platform false false true

View File

@@ -762,6 +762,10 @@ const (
// Enables http proxy settings for aws datasources
FlagAwsDatasourcesHttpProxy = "awsDatasourcesHttpProxy"
// FlagOpentsdbBackendMigration
// Run queries through the data source backend
FlagOpentsdbBackendMigration = "opentsdbBackendMigration"
// FlagKubernetesAlertingHistorian
// Adds support for Kubernetes alerting historian APIs
FlagKubernetesAlertingHistorian = "kubernetesAlertingHistorian"

View File

@@ -2499,6 +2499,20 @@
"expression": "true"
}
},
{
"metadata": {
"name": "opentsdbBackendMigration",
"resourceVersion": "1763456634837",
"creationTimestamp": "2025-11-18T09:03:54Z"
},
"spec": {
"description": "Run queries through the data source backend",
"stage": "GA",
"codeowner": "@grafana/oss-big-tent",
"requiresRestart": true,
"expression": "false"
}
},
{
"metadata": {
"name": "otelLogsFormatting",

View File

@@ -62,6 +62,7 @@ type QueryModel struct {
IsCounter bool `json:"isCounter"`
CounterMax string `json:"counterMax"`
CounterResetValue string `json:"counterResetValue"`
ExplicitTags bool `json:"explicitTags"`
}
func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc {
@@ -94,6 +95,66 @@ func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.Ins
}
}
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
logger := logger.FromContext(ctx)
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}
u, err := url.Parse(dsInfo.URL)
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}
u.Path = path.Join(u.Path, "api/suggest")
query := u.Query()
query.Set("q", "cpu")
query.Set("type", "metrics")
u.RawQuery = query.Encode()
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}
res, err := dsInfo.HTTPClient.Do(httpReq)
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Error("Failed to close response body", "error", err)
}
}()
if res.StatusCode != 200 {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: fmt.Sprintf("OpenTSDB suggest endpoint returned status %d", res.StatusCode),
}, nil
}
return &backend.CheckHealthResult{
Status: backend.HealthStatusOk,
Message: "Data source is working",
}, nil
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := logger.FromContext(ctx)
@@ -176,8 +237,18 @@ func createInitialFrame(val OpenTsdbCommon, length int, refID string) *data.Fram
labels[label] = value
}
tagKeys := make([]string, 0, len(val.Tags)+len(val.AggregateTags))
for tagKey := range val.Tags {
tagKeys = append(tagKeys, tagKey)
}
tagKeys = append(tagKeys, val.AggregateTags...)
frame := data.NewFrameOfFieldTypes(val.Metric, length, data.FieldTypeTime, data.FieldTypeFloat64)
frame.Meta = &data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti, TypeVersion: data.FrameTypeVersion{0, 1}}
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMulti,
TypeVersion: data.FrameTypeVersion{0, 1},
Custom: map[string]any{"tagKeys": tagKeys},
}
frame.RefID = refID
timeField := frame.Fields[0]
timeField.Name = data.TimeSeriesTimeFieldName
@@ -295,10 +366,19 @@ func (s *Service) buildMetric(query backend.DataQuery) map[string]any {
if !model.DisableDownsampling {
downsampleInterval := model.DownsampleInterval
if downsampleInterval == "" {
downsampleInterval = "1m" // default value for blank
if ms := query.Interval.Milliseconds(); ms > 0 {
downsampleInterval = formatDownsampleInterval(ms)
} else {
downsampleInterval = "1m"
}
} else if strings.Contains(downsampleInterval, ".") && strings.HasSuffix(downsampleInterval, "s") {
if val, err := strconv.ParseFloat(strings.TrimSuffix(downsampleInterval, "s"), 64); err == nil {
downsampleInterval = strconv.FormatInt(int64(val*1000), 10) + "ms"
}
}
downsample := downsampleInterval + "-" + model.DownsampleAggregator
if model.DownsampleFillPolicy != "none" {
if model.DownsampleFillPolicy != "" && model.DownsampleFillPolicy != "none" {
metric["downsample"] = downsample + "-" + model.DownsampleFillPolicy
} else {
metric["downsample"] = downsample
@@ -348,6 +428,10 @@ func (s *Service) buildMetric(query backend.DataQuery) map[string]any {
metric["filters"] = model.Filters
}
if model.ExplicitTags {
metric["explicitTags"] = true
}
return metric
}
@@ -364,3 +448,26 @@ func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext
return instance, nil
}
func formatDownsampleInterval(ms int64) string {
seconds := ms / 1000
if seconds < 60 {
if seconds < 1 {
return strconv.FormatInt(ms, 10) + "ms"
}
return strconv.FormatInt(seconds, 10) + "s"
}
minutes := seconds / 60
if minutes < 60 {
return strconv.FormatInt(minutes, 10) + "m"
}
hours := minutes / 60
if hours < 24 {
return strconv.FormatInt(hours, 10) + "h"
}
days := hours / 24
return strconv.FormatInt(days, 10) + "d"
}

View File

@@ -18,6 +18,58 @@ import (
"github.com/stretchr/testify/require"
)
func TestCheckHealth(t *testing.T) {
tests := []struct {
name string
httpStatusCode int
expectedStatus backend.HealthStatus
expectedMessage string
}{
{
name: "successful health check",
httpStatusCode: 200,
expectedStatus: backend.HealthStatusOk,
expectedMessage: "Data source is working",
},
{
name: "http error",
httpStatusCode: 500,
expectedStatus: backend.HealthStatusError,
expectedMessage: "OpenTSDB suggest endpoint returned status 500",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/suggest", r.URL.Path)
assert.Equal(t, "cpu", r.URL.Query().Get("q"))
assert.Equal(t, "metrics", r.URL.Query().Get("type"))
w.WriteHeader(tt.httpStatusCode)
}))
defer server.Close()
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
URL: server.URL,
JSONData: []byte(`{}`),
},
}
im := datasource.NewInstanceManager(newInstanceSettings(httpclient.NewProvider()))
service := &Service{im: im}
ctx := backend.WithPluginContext(context.Background(), pluginCtx)
result, err := service.CheckHealth(ctx, &backend.CheckHealthRequest{
PluginContext: pluginCtx,
})
assert.NoError(t, err)
assert.Equal(t, tt.expectedStatus, result.Status)
assert.Contains(t, result.Message, tt.expectedMessage)
})
}
}
func TestOpenTsdbExecutor(t *testing.T) {
service := &Service{}

View File

@@ -10,7 +10,8 @@ import (
)
var (
_ backend.QueryDataHandler = (*Datasource)(nil)
_ backend.QueryDataHandler = (*Datasource)(nil)
_ backend.CheckHealthHandler = (*Datasource)(nil)
)
type Datasource struct {
@@ -26,3 +27,7 @@ func NewDatasource(context.Context, backend.DataSourceInstanceSettings) (instanc
func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
return d.Service.QueryData(ctx, req)
}
func (d *Datasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
return d.Service.CheckHealth(ctx, req)
}

View File

@@ -7,8 +7,9 @@ type OpenTsdbQuery struct {
}
type OpenTsdbCommon struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
AggregateTags []string `json:"aggregateTags"`
}
type OpenTsdbResponse struct {

View File

@@ -20,19 +20,25 @@ import {
AnnotationEvent,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
dateMath,
DateTime,
ScopedVars,
toDataFrame,
} from '@grafana/data';
import { FetchResponse, getBackendSrv, getTemplateSrv, TemplateSrv } from '@grafana/runtime';
import {
config,
DataSourceWithBackend,
FetchResponse,
getBackendSrv,
getTemplateSrv,
TemplateSrv,
} from '@grafana/runtime';
import { AnnotationEditor } from './components/AnnotationEditor';
import { prepareAnnotation } from './migrations';
import { OpenTsdbFilter, OpenTsdbOptions, OpenTsdbQuery } from './types';
export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenTsdbOptions> {
export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuery, OpenTsdbOptions> {
type: 'opentsdb';
url: string;
name: string;
@@ -72,6 +78,15 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
// Called once per panel (graph)
query(options: DataQueryRequest<OpenTsdbQuery>): Observable<DataQueryResponse> {
if (config.featureToggles.opentsdbBackendMigration) {
return super.query(options).pipe(
map((response) => {
this._saveTagKeysFromFrames(response.data);
return response;
})
);
}
// migrate annotations
if (options.targets.some((target: OpenTsdbQuery) => target.fromAnnotations)) {
const streams: Array<Observable<DataQueryResponse>> = [];
@@ -259,6 +274,15 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
this.tagKeys[metricData.metric] = tagKeys;
}
_saveTagKeysFromFrames(frames: any[]) {
for (const frame of frames) {
const tagKeys = frame.meta?.custom?.tagKeys;
if (frame.name && tagKeys) {
this.tagKeys[frame.name] = tagKeys;
}
}
}
_performSuggestQuery(query: string, type: string) {
return this._get('/api/suggest', { type, q: query, max: this.lookupLimit }).pipe(
map((result) => {
@@ -397,7 +421,11 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
return Promise.resolve([]);
}
testDatasource() {
async testDatasource() {
if (config.featureToggles.opentsdbBackendMigration) {
return await super.testDatasource();
}
return lastValueFrom(
this._performSuggestQuery('cpu', 'metrics').pipe(
map(() => {