Compare commits

...

4 Commits

Author SHA1 Message Date
Gareth Dawson
cc648c68f5 remove trailing / from metadata requests 2025-12-19 21:38:03 +09:00
Gareth Dawson
cf35e73ec9 return errors for failed unmarshal 2025-12-18 22:19:01 +09:00
Gareth Dawson
3116d8a819 OpenTSDB: Migrate annotations to the data source backend 2025-12-17 23:06:52 +09:00
Gareth Dawson
86ff0917ca OpenTSDB: Migrate metadata queries to data source backend 2025-12-12 21:34:06 +09:00
5 changed files with 502 additions and 30 deletions

View File

@@ -1,10 +1,13 @@
package opentsdb
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"sort"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
@@ -65,3 +68,386 @@ func (s *Service) HandleSuggestQuery(rw http.ResponseWriter, req *http.Request)
return
}
}
func (s *Service) HandleAggregatorsQuery(rw http.ResponseWriter, req *http.Request) {
logger := logger.FromContext(req.Context())
dsInfo, err := s.getDSInfo(req.Context(), backend.PluginConfigFromContext(req.Context()))
if err != nil {
http.Error(rw, fmt.Sprintf("failed to get datasource info: %v", err), http.StatusInternalServerError)
return
}
u, err := url.Parse(dsInfo.URL)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to parse datasource URL: %v", err), http.StatusInternalServerError)
return
}
u.Path = path.Join(u.Path, "api/aggregators")
httpReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, u.String(), nil)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to create request: %v", err), http.StatusInternalServerError)
return
}
res, err := dsInfo.HTTPClient.Do(httpReq)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to execute request: %v", err), http.StatusInternalServerError)
return
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Error("Failed to close response body", "error", err)
}
}()
responseBody, err := DecodeResponseBody(res, logger)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to decode response: %v", err), http.StatusInternalServerError)
return
}
var aggregators []string
if err := json.Unmarshal(responseBody, &aggregators); err != nil {
http.Error(rw, fmt.Sprintf("failed to unmarshal aggregators response: %v", err), http.StatusInternalServerError)
return
}
sort.Strings(aggregators)
sortedResponse, err := json.Marshal(aggregators)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal response: %v", err), http.StatusInternalServerError)
return
}
for name, values := range res.Header {
if name == "Content-Encoding" || name == "Content-Length" {
continue
}
for _, value := range values {
rw.Header().Add(name, value)
}
}
rw.WriteHeader(res.StatusCode)
if _, err := rw.Write(sortedResponse); err != nil {
logger.Error("Failed to write response", "error", err)
return
}
}
func (s *Service) HandleFiltersQuery(rw http.ResponseWriter, req *http.Request) {
logger := logger.FromContext(req.Context())
dsInfo, err := s.getDSInfo(req.Context(), backend.PluginConfigFromContext(req.Context()))
if err != nil {
http.Error(rw, fmt.Sprintf("failed to get datasource info: %v", err), http.StatusInternalServerError)
return
}
u, err := url.Parse(dsInfo.URL)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to parse datasource URL: %v", err), http.StatusInternalServerError)
return
}
u.Path = path.Join(u.Path, "/api/config/filters")
httpReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, u.String(), nil)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to create request: %v", err), http.StatusInternalServerError)
return
}
res, err := dsInfo.HTTPClient.Do(httpReq)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to execute request: %v", err), http.StatusInternalServerError)
return
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Error("Failed to close response body", "error", err)
}
}()
responseBody, err := DecodeResponseBody(res, logger)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to decode response: %v", err), http.StatusInternalServerError)
return
}
var filters map[string]json.RawMessage
if err := json.Unmarshal(responseBody, &filters); err != nil {
http.Error(rw, fmt.Sprintf("failed to unmarshal filters response: %v", err), http.StatusInternalServerError)
return
}
keys := make([]string, 0, len(filters))
for key := range filters {
keys = append(keys, key)
}
sort.Strings(keys)
sortedResponse, err := json.Marshal(keys)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal response: %v", err), http.StatusInternalServerError)
return
}
for name, values := range res.Header {
if name == "Content-Encoding" || name == "Content-Length" {
continue
}
for _, value := range values {
rw.Header().Add(name, value)
}
}
rw.WriteHeader(res.StatusCode)
if _, err := rw.Write(sortedResponse); err != nil {
logger.Error("Failed to write response", "error", err)
return
}
}
func (s *Service) HandleLookupQuery(rw http.ResponseWriter, req *http.Request) {
queryParams := req.URL.Query()
typeParam := queryParams.Get("type")
if typeParam == "" {
http.Error(rw, "missing 'type' parameter", http.StatusBadRequest)
return
}
switch typeParam {
case "key":
s.HandleKeyLookup(rw, req, queryParams)
case "keyvalue":
s.HandleKeyValueLookup(rw, req, queryParams)
default:
http.Error(rw, fmt.Sprintf("unsupported type: %s", typeParam), http.StatusBadRequest)
return
}
}
func (s *Service) HandleKeyLookup(rw http.ResponseWriter, req *http.Request, queryParams url.Values) {
logger := logger.FromContext(req.Context())
dsInfo, err := s.getDSInfo(req.Context(), backend.PluginConfigFromContext(req.Context()))
if err != nil {
http.Error(rw, fmt.Sprintf("failed to get datasource info: %v", err), http.StatusInternalServerError)
return
}
metric := queryParams.Get("metric")
if metric == "" {
http.Error(rw, "missing 'metric' parameter", http.StatusBadRequest)
return
}
u, err := url.Parse(dsInfo.URL)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to parse datasource URL: %v", err), http.StatusInternalServerError)
return
}
u.Path = path.Join(u.Path, "api/search/lookup")
lookupQueryParams := u.Query()
lookupQueryParams.Set("m", metric)
lookupQueryParams.Set("limit", "1000")
u.RawQuery = lookupQueryParams.Encode()
httpReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, u.String(), nil)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to create request: %v", err), http.StatusInternalServerError)
return
}
res, err := dsInfo.HTTPClient.Do(httpReq)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to execute request: %v", err), http.StatusInternalServerError)
return
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Error("Failed to close response body", "error", err)
}
}()
responseBody, err := DecodeResponseBody(res, logger)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to decode response: %v", err), http.StatusInternalServerError)
return
}
var lookupResponse struct {
Results []struct {
Tags map[string]string `json:"tags"`
} `json:"results"`
}
if err := json.Unmarshal(responseBody, &lookupResponse); err != nil {
http.Error(rw, fmt.Sprintf("failed to unmarshal lookup response: %v", err), http.StatusInternalServerError)
return
}
tagKeysMap := make(map[string]bool)
for _, result := range lookupResponse.Results {
for tagKey := range result.Tags {
tagKeysMap[tagKey] = true
}
}
tagKeys := make([]string, 0, len(tagKeysMap))
for tagKey := range tagKeysMap {
tagKeys = append(tagKeys, tagKey)
}
sort.Strings(tagKeys)
sortedResponse, err := json.Marshal(tagKeys)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal response: %v", err), http.StatusInternalServerError)
return
}
for name, values := range res.Header {
if name == "Content-Encoding" || name == "Content-Length" {
continue
}
for _, value := range values {
rw.Header().Add(name, value)
}
}
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(res.StatusCode)
if _, err := rw.Write(sortedResponse); err != nil {
logger.Error("Failed to write response", "error", err)
return
}
}
func (s *Service) HandleKeyValueLookup(rw http.ResponseWriter, req *http.Request, queryParams url.Values) {
logger := logger.FromContext(req.Context())
dsInfo, err := s.getDSInfo(req.Context(), backend.PluginConfigFromContext(req.Context()))
if err != nil {
http.Error(rw, fmt.Sprintf("failed to get datasource info: %v", err), http.StatusInternalServerError)
return
}
metric := queryParams.Get("metric")
if metric == "" {
http.Error(rw, "missing 'metric' parameter", http.StatusBadRequest)
return
}
keys := queryParams.Get("keys")
if keys == "" {
http.Error(rw, "missing 'keys' parameter", http.StatusBadRequest)
return
}
keysArray := strings.Split(keys, ",")
for i := range keysArray {
keysArray[i] = strings.TrimSpace(keysArray[i])
}
if len(keysArray) == 0 {
http.Error(rw, "keys parameter cannot be empty", http.StatusBadRequest)
return
}
key := keysArray[0]
keysQuery := key + "=*"
if len(keysArray) > 1 {
keysQuery += "," + strings.Join(keysArray[1:], ",")
}
m := metric + "{" + keysQuery + "}"
u, err := url.Parse(dsInfo.URL)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to parse datasource URL: %v", err), http.StatusInternalServerError)
return
}
u.Path = path.Join(u.Path, "api/search/lookup")
lookupQueryParams := u.Query()
lookupQueryParams.Set("m", m)
lookupQueryParams.Set("limit", fmt.Sprintf("%d", dsInfo.LookupLimit))
u.RawQuery = lookupQueryParams.Encode()
httpReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, u.String(), nil)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to create request: %v", err), http.StatusInternalServerError)
return
}
res, err := dsInfo.HTTPClient.Do(httpReq)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to execute request: %v", err), http.StatusInternalServerError)
return
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Error("Failed to close response body", "error", err)
}
}()
responseBody, err := DecodeResponseBody(res, logger)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to decode response: %v", err), http.StatusInternalServerError)
return
}
var lookupResponse struct {
Results []struct {
Tags map[string]string `json:"tags"`
} `json:"results"`
}
if err := json.Unmarshal(responseBody, &lookupResponse); err != nil {
http.Error(rw, fmt.Sprintf("failed to unmarshal lookup response: %v", err), http.StatusInternalServerError)
return
}
tagValuesMap := make(map[string]bool)
for _, result := range lookupResponse.Results {
if tagValue, exists := result.Tags[key]; exists {
tagValuesMap[tagValue] = true
}
}
tagValues := make([]string, 0, len(tagValuesMap))
for tagValue := range tagValuesMap {
tagValues = append(tagValues, tagValue)
}
sort.Strings(tagValues)
sortedResponse, err := json.Marshal(tagValues)
if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal response: %v", err), http.StatusInternalServerError)
return
}
for name, values := range res.Header {
if name == "Content-Encoding" || name == "Content-Length" {
continue
}
for _, value := range values {
rw.Header().Add(name, value)
}
}
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(res.StatusCode)
if _, err := rw.Write(sortedResponse); err != nil {
logger.Error("Failed to write response", "error", err)
return
}
}

View File

@@ -152,6 +152,9 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
mux := http.NewServeMux()
mux.HandleFunc("/api/suggest", s.HandleSuggestQuery)
mux.HandleFunc("/api/aggregators", s.HandleAggregatorsQuery)
mux.HandleFunc("/api/config/filters", s.HandleFiltersQuery)
mux.HandleFunc("/api/search/lookup", s.HandleLookupQuery)
handler := httpadapter.New(mux)
return handler.CallResource(ctx, req, sender)

View File

@@ -7,9 +7,16 @@ type OpenTsdbQuery struct {
}
type OpenTsdbCommon struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
AggregateTags []string `json:"aggregateTags"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
AggregateTags []string `json:"aggregateTags"`
Annotations []OpenTsdbAnnotation `json:"annotations,omitempty"`
GlobalAnnotations []OpenTsdbAnnotation `json:"globalAnnotations,omitempty"`
}
type OpenTsdbAnnotation struct {
Description string `json:"description"`
StartTime float64 `json:"startTime"`
}
type OpenTsdbResponse struct {

View File

@@ -198,11 +198,21 @@ func CreateDataFrame(val OpenTsdbCommon, length int, refID string) *data.Frame {
sort.Strings(tagKeys)
tagKeys = append(tagKeys, val.AggregateTags...)
custom := map[string]any{
"tagKeys": tagKeys,
}
if len(val.Annotations) > 0 {
custom["annotations"] = val.Annotations
}
if len(val.GlobalAnnotations) > 0 {
custom["globalAnnotations"] = val.GlobalAnnotations
}
frame := data.NewFrameOfFieldTypes(val.Metric, length, data.FieldTypeTime, data.FieldTypeFloat64)
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMulti,
TypeVersion: data.FrameTypeVersion{0, 1},
Custom: map[string]any{"tagKeys": tagKeys},
Custom: custom,
}
frame.RefID = refID
timeField := frame.Fields[0]

View File

@@ -77,8 +77,28 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
};
}
// Called once per panel (graph)
query(options: DataQueryRequest<OpenTsdbQuery>): Observable<DataQueryResponse> {
if (options.targets.some((target: OpenTsdbQuery) => target.fromAnnotations)) {
const streams: Array<Observable<DataQueryResponse>> = [];
for (const annotation of options.targets) {
if (annotation.target) {
streams.push(
new Observable((subscriber) => {
this.annotationEvent(options, annotation)
.then((events) => subscriber.next({ data: [toDataFrame(events)] }))
.catch((ex) => {
return subscriber.next({ data: [toDataFrame([])] });
})
.finally(() => subscriber.complete());
})
);
}
}
return merge(...streams);
}
if (config.featureToggles.opentsdbBackendMigration) {
const hasValidTargets = options.targets.some((target) => target.metric && !target.hide);
if (!hasValidTargets) {
@@ -93,31 +113,6 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
);
}
// migrate annotations
if (options.targets.some((target: OpenTsdbQuery) => target.fromAnnotations)) {
const streams: Array<Observable<DataQueryResponse>> = [];
for (const annotation of options.targets) {
if (annotation.target) {
streams.push(
new Observable((subscriber) => {
this.annotationEvent(options, annotation)
.then((events) => subscriber.next({ data: [toDataFrame(events)] }))
.catch((ex) => {
// grafana fetch throws the error so for annotation consistency among datasources
// we return an empty array which displays as 'no events found'
// in the annnotation editor
return subscriber.next({ data: [toDataFrame([])] });
})
.finally(() => subscriber.complete());
})
);
}
}
return merge(...streams);
}
const start = this.convertToTSDBTime(options.range.raw.from, false, options.timezone);
const end = this.convertToTSDBTime(options.range.raw.to, true, options.timezone);
const qs: any[] = [];
@@ -181,6 +176,58 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
}
annotationEvent(options: DataQueryRequest, annotation: OpenTsdbQuery): Promise<AnnotationEvent[]> {
if (config.featureToggles.opentsdbBackendMigration) {
const query: OpenTsdbQuery = {
refId: annotation.refId ?? 'Anno',
metric: annotation.target,
aggregator: 'sum',
fromAnnotations: true,
isGlobal: annotation.isGlobal,
disableDownsampling: true,
};
const queryRequest: DataQueryRequest<OpenTsdbQuery> = {
...options,
targets: [query],
};
return lastValueFrom(
super.query(queryRequest).pipe(
map((response) => {
const eventList: AnnotationEvent[] = [];
for (const frame of response.data) {
// const annotations = frame.meta?.custom?.annotations;
// const globalAnnotations = frame.meta?.custom?.globalAnnotations;
const annotationObject = annotation.isGlobal
? frame.meta?.custom?.globalAnnotations
: frame.meta?.custom?.annotations;
// let annotationObject = annotations;
// if (annotation.isGlobal && globalAnnotations) {
// annotationObject = globalAnnotations;
// }
if (annotationObject && isArray(annotationObject)) {
annotationObject.forEach((ann) => {
const event: AnnotationEvent = {
text: ann.description,
time: Math.floor(ann.startTime) * 1000,
annotation: annotation,
};
eventList.push(event);
});
}
}
console.log('be eventList', eventList);
return eventList;
})
)
);
}
const start = this.convertToTSDBTime(options.range.raw.from, false, options.timezone);
const end = this.convertToTSDBTime(options.range.raw.to, true, options.timezone);
const qs = [];
@@ -306,6 +353,10 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
return of([]);
}
if (config.featureToggles.opentsdbBackendMigration) {
return from(this.getResource('api/search/lookup', { type: 'keyvalue', metric, keys }));
}
const keysArray = keys.split(',').map((key) => {
return key.trim();
});
@@ -337,6 +388,11 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
return of([]);
}
if (config.featureToggles.opentsdbBackendMigration) {
console.log('_performMetricKeyValueLookup', 'key');
return from(this.getResource('api/search/lookup', { type: 'key', metric }));
}
return this._get('/api/search/lookup', { m: metric, limit: 1000 }).pipe(
map((result) => {
result = result.data.results;
@@ -450,6 +506,11 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
return this.aggregatorsPromise;
}
if (config.featureToggles.opentsdbBackendMigration) {
this.aggregatorsPromise = this.getResource('api/aggregators');
return this.aggregatorsPromise;
}
this.aggregatorsPromise = lastValueFrom(
this._get('/api/aggregators').pipe(
map((result) => {
@@ -468,6 +529,11 @@ export default class OpenTsDatasource extends DataSourceWithBackend<OpenTsdbQuer
return this.filterTypesPromise;
}
if (config.featureToggles.opentsdbBackendMigration) {
this.filterTypesPromise = this.getResource('api/config/filters');
return this.filterTypesPromise;
}
this.filterTypesPromise = lastValueFrom(
this._get('/api/config/filters').pipe(
map((result) => {