mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 03:54:29 +08:00
Compare commits
35 Commits
docs/grafa
...
gtk-grafan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49cace9952 | ||
|
|
effb9a1f9a | ||
|
|
dc6dcc5a78 | ||
|
|
e2e30ba8dc | ||
|
|
c644838884 | ||
|
|
c5b49ccab6 | ||
|
|
42b07f638d | ||
|
|
91f04e8788 | ||
|
|
3207a4c37a | ||
|
|
2c95fae36b | ||
|
|
ef11f2b633 | ||
|
|
af00322124 | ||
|
|
f1a8041666 | ||
|
|
1d631f3537 | ||
|
|
2f88b7dd8b | ||
|
|
e57411e4d3 | ||
|
|
db30a16862 | ||
|
|
9bb9d1f420 | ||
|
|
10cdb94062 | ||
|
|
293ffafa01 | ||
|
|
c1e7bcede5 | ||
|
|
48204060ef | ||
|
|
5b8d8d6698 | ||
|
|
bf2e29d106 | ||
|
|
0c989fa3c9 | ||
|
|
6004f14a38 | ||
|
|
9ec94f74b9 | ||
|
|
4a63a94b0d | ||
|
|
0720aa1d88 | ||
|
|
738855157e | ||
|
|
2bb6e6bd4b | ||
|
|
61cb899515 | ||
|
|
e0a4017838 | ||
|
|
3c8ff953dd | ||
|
|
cb2eacbdfc |
@@ -597,7 +597,7 @@ export interface FeatureToggles {
|
|||||||
*/
|
*/
|
||||||
alertingPrometheusRulesPrimary?: boolean;
|
alertingPrometheusRulesPrimary?: boolean;
|
||||||
/**
|
/**
|
||||||
* Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
* Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||||
*/
|
*/
|
||||||
exploreLogsShardSplitting?: boolean;
|
exploreLogsShardSplitting?: boolean;
|
||||||
/**
|
/**
|
||||||
@@ -1183,6 +1183,10 @@ export interface FeatureToggles {
|
|||||||
*/
|
*/
|
||||||
ttlPluginInstanceManager?: boolean;
|
ttlPluginInstanceManager?: boolean;
|
||||||
/**
|
/**
|
||||||
|
* Send X-Loki-Query-Limits-Context header to Loki on first split request
|
||||||
|
*/
|
||||||
|
lokiQueryLimitsContext?: boolean;
|
||||||
|
/**
|
||||||
* Enables the new version of rudderstack
|
* Enables the new version of rudderstack
|
||||||
* @default false
|
* @default false
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -50,6 +50,14 @@ export interface LokiDataQuery extends common.DataQuery {
|
|||||||
* Used to override the name of the series.
|
* Used to override the name of the series.
|
||||||
*/
|
*/
|
||||||
legendFormat?: string;
|
legendFormat?: string;
|
||||||
|
/**
|
||||||
|
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||||
|
*/
|
||||||
|
limitsContext?: {
|
||||||
|
expr: string;
|
||||||
|
from: number;
|
||||||
|
to: number;
|
||||||
|
};
|
||||||
/**
|
/**
|
||||||
* Used to limit the number of log rows returned.
|
* Used to limit the number of log rows returned.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1041,7 +1041,7 @@ var (
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "exploreLogsShardSplitting",
|
Name: "exploreLogsShardSplitting",
|
||||||
Description: "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
Description: "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||||
Stage: FeatureStageExperimental,
|
Stage: FeatureStageExperimental,
|
||||||
FrontendOnly: true,
|
FrontendOnly: true,
|
||||||
Owner: grafanaObservabilityLogsSquad,
|
Owner: grafanaObservabilityLogsSquad,
|
||||||
@@ -2054,6 +2054,13 @@ var (
|
|||||||
FrontendOnly: true,
|
FrontendOnly: true,
|
||||||
Owner: grafanaPluginsPlatformSquad,
|
Owner: grafanaPluginsPlatformSquad,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "lokiQueryLimitsContext",
|
||||||
|
Description: "Send X-Loki-Query-Limits-Context header to Loki on first split request",
|
||||||
|
Stage: FeatureStageExperimental,
|
||||||
|
FrontendOnly: true,
|
||||||
|
Owner: grafanaObservabilityLogsSquad,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "rudderstackUpgrade",
|
Name: "rudderstackUpgrade",
|
||||||
Description: "Enables the new version of rudderstack",
|
Description: "Enables the new version of rudderstack",
|
||||||
|
|||||||
1
pkg/services/featuremgmt/toggles_gen.csv
generated
1
pkg/services/featuremgmt/toggles_gen.csv
generated
@@ -264,4 +264,5 @@ kubernetesAnnotations,experimental,@grafana/grafana-backend-services-squad,false
|
|||||||
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
|
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
|
||||||
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
|
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
|
||||||
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
|
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
|
||||||
|
lokiQueryLimitsContext,experimental,@grafana/observability-logs,false,false,true
|
||||||
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
|
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
|
||||||
|
|||||||
|
6
pkg/services/featuremgmt/toggles_gen.go
generated
6
pkg/services/featuremgmt/toggles_gen.go
generated
@@ -552,7 +552,7 @@ const (
|
|||||||
FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary"
|
FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary"
|
||||||
|
|
||||||
// FlagExploreLogsShardSplitting
|
// FlagExploreLogsShardSplitting
|
||||||
// Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
// Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||||
FlagExploreLogsShardSplitting = "exploreLogsShardSplitting"
|
FlagExploreLogsShardSplitting = "exploreLogsShardSplitting"
|
||||||
|
|
||||||
// FlagExploreLogsAggregatedMetrics
|
// FlagExploreLogsAggregatedMetrics
|
||||||
@@ -1066,6 +1066,10 @@ const (
|
|||||||
// Enable TTL plugin instance manager
|
// Enable TTL plugin instance manager
|
||||||
FlagTtlPluginInstanceManager = "ttlPluginInstanceManager"
|
FlagTtlPluginInstanceManager = "ttlPluginInstanceManager"
|
||||||
|
|
||||||
|
// FlagLokiQueryLimitsContext
|
||||||
|
// Send X-Loki-Query-Limits-Context header to Loki on first split request
|
||||||
|
FlagLokiQueryLimitsContext = "lokiQueryLimitsContext"
|
||||||
|
|
||||||
// FlagRudderstackUpgrade
|
// FlagRudderstackUpgrade
|
||||||
// Enables the new version of rudderstack
|
// Enables the new version of rudderstack
|
||||||
FlagRudderstackUpgrade = "rudderstackUpgrade"
|
FlagRudderstackUpgrade = "rudderstackUpgrade"
|
||||||
|
|||||||
22
pkg/services/featuremgmt/toggles_gen.json
generated
22
pkg/services/featuremgmt/toggles_gen.json
generated
@@ -1574,11 +1574,14 @@
|
|||||||
{
|
{
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"name": "exploreLogsShardSplitting",
|
"name": "exploreLogsShardSplitting",
|
||||||
"resourceVersion": "1753448760331",
|
"resourceVersion": "1763611567823",
|
||||||
"creationTimestamp": "2024-08-29T13:55:59Z"
|
"creationTimestamp": "2024-08-29T13:55:59Z",
|
||||||
|
"annotations": {
|
||||||
|
"grafana.app/updatedTimestamp": "2025-11-20 04:06:07.82367 +0000 UTC"
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"spec": {
|
"spec": {
|
||||||
"description": "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
"description": "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||||
"stage": "experimental",
|
"stage": "experimental",
|
||||||
"codeowner": "@grafana/observability-logs",
|
"codeowner": "@grafana/observability-logs",
|
||||||
"frontend": true
|
"frontend": true
|
||||||
@@ -2636,6 +2639,19 @@
|
|||||||
"codeowner": "@grafana/observability-logs"
|
"codeowner": "@grafana/observability-logs"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"metadata": {
|
||||||
|
"name": "lokiQueryLimitsContext",
|
||||||
|
"resourceVersion": "1763558434858",
|
||||||
|
"creationTimestamp": "2025-11-19T13:20:34Z"
|
||||||
|
},
|
||||||
|
"spec": {
|
||||||
|
"description": "Send X-Loki-Query-Limits-Context header to Loki on first split request",
|
||||||
|
"stage": "experimental",
|
||||||
|
"codeowner": "@grafana/observability-logs",
|
||||||
|
"frontend": true
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"name": "lokiQuerySplitting",
|
"name": "lokiQuerySplitting",
|
||||||
|
|||||||
@@ -96,6 +96,8 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
|
|||||||
return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
|
return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addQueryLimitsHeader(query, req)
|
||||||
|
|
||||||
if query.SupportingQueryType != SupportingQueryNone {
|
if query.SupportingQueryType != SupportingQueryNone {
|
||||||
value := getSupportingQueryHeaderValue(query.SupportingQueryType)
|
value := getSupportingQueryHeaderValue(query.SupportingQueryType)
|
||||||
if value != "" {
|
if value != "" {
|
||||||
@@ -108,6 +110,15 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
|
|||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addQueryLimitsHeader(query lokiQuery, req *http.Request) {
|
||||||
|
if len(query.LimitsContext.Expr) > 0 {
|
||||||
|
queryLimitStr, err := json.Marshal(query.LimitsContext)
|
||||||
|
if err == nil {
|
||||||
|
req.Header.Set("X-Loki-Query-Limits-Context", string(queryLimitStr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type lokiResponseError struct {
|
type lokiResponseError struct {
|
||||||
Message string `json:"message"`
|
Message string `json:"message"`
|
||||||
TraceID string `json:"traceID,omitempty"`
|
TraceID string `json:"traceID,omitempty"`
|
||||||
|
|||||||
@@ -2,10 +2,12 @@ package loki
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
|
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
|
||||||
@@ -47,6 +49,56 @@ func TestApiLogVolume(t *testing.T) {
|
|||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("X-Loki-Query-Limits-Context header should be set when LimitsContext is provided", func(t *testing.T) {
|
||||||
|
called := false
|
||||||
|
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
|
||||||
|
to := time.Now().Truncate(time.Millisecond)
|
||||||
|
limitsContext := LimitsContext{
|
||||||
|
Expr: "{cluster=\"us-central1\"}",
|
||||||
|
From: from,
|
||||||
|
To: to,
|
||||||
|
}
|
||||||
|
|
||||||
|
limitsContextJson, _ := json.Marshal(limitsContext)
|
||||||
|
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||||
|
called = true
|
||||||
|
require.Equal(t, string(limitsContextJson), req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||||
|
})
|
||||||
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, called)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is missing expr", func(t *testing.T) {
|
||||||
|
called := false
|
||||||
|
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
|
||||||
|
to := time.Now().Truncate(time.Millisecond)
|
||||||
|
limitsContext := LimitsContext{
|
||||||
|
Expr: "",
|
||||||
|
From: from,
|
||||||
|
To: to,
|
||||||
|
}
|
||||||
|
|
||||||
|
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||||
|
called = true
|
||||||
|
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||||
|
})
|
||||||
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, called)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is not provided", func(t *testing.T) {
|
||||||
|
called := false
|
||||||
|
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||||
|
called = true
|
||||||
|
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||||
|
})
|
||||||
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, called)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("data sample queries should set data sample http header", func(t *testing.T) {
|
t.Run("data sample queries should set data sample http header", func(t *testing.T) {
|
||||||
called := false
|
called := false
|
||||||
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||||
|
|||||||
13
pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
generated
13
pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
generated
@@ -18,6 +18,17 @@ const (
|
|||||||
QueryEditorModeBuilder QueryEditorMode = "builder"
|
QueryEditorModeBuilder QueryEditorMode = "builder"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type LimitsContext struct {
|
||||||
|
Expr string `json:"expr"`
|
||||||
|
From int64 `json:"from"`
|
||||||
|
To int64 `json:"to"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLimitsContext creates a new LimitsContext object.
|
||||||
|
func NewLimitsContext() *LimitsContext {
|
||||||
|
return &LimitsContext{}
|
||||||
|
}
|
||||||
|
|
||||||
type LokiQueryType string
|
type LokiQueryType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -59,6 +70,8 @@ type LokiDataQuery struct {
|
|||||||
Instant *bool `json:"instant,omitempty"`
|
Instant *bool `json:"instant,omitempty"`
|
||||||
// Used to set step value for range queries.
|
// Used to set step value for range queries.
|
||||||
Step *string `json:"step,omitempty"`
|
Step *string `json:"step,omitempty"`
|
||||||
|
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||||
|
LimitsContext *LimitsContext `json:"limitsContext,omitempty"`
|
||||||
// A unique identifier for the query within the list of targets.
|
// A unique identifier for the query within the list of targets.
|
||||||
// In server side expressions, the refId is used as a variable name to identify results.
|
// In server side expressions, the refId is used as a variable name to identify results.
|
||||||
// By default, the UI will assign A->Z; however setting meaningful names may be useful.
|
// By default, the UI will assign A->Z; however setting meaningful names may be useful.
|
||||||
|
|||||||
@@ -156,6 +156,8 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
|
|||||||
|
|
||||||
expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step)
|
expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step)
|
||||||
|
|
||||||
|
limitsConfig := generateLimitsConfig(model, interval, timeRange, queryType, step)
|
||||||
|
|
||||||
direction, err := parseDirection(model.Direction)
|
direction, err := parseDirection(model.Direction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -192,8 +194,21 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
|
|||||||
RefID: query.RefID,
|
RefID: query.RefID,
|
||||||
SupportingQueryType: supportingQueryType,
|
SupportingQueryType: supportingQueryType,
|
||||||
Scopes: model.Scopes,
|
Scopes: model.Scopes,
|
||||||
|
LimitsContext: limitsConfig,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return qs, nil
|
return qs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func generateLimitsConfig(model *QueryJSONModel, interval time.Duration, timeRange time.Duration, queryType QueryType, step time.Duration) LimitsContext {
|
||||||
|
var limitsConfig LimitsContext
|
||||||
|
// Only supply limits context config if we have expression, and from and to
|
||||||
|
if model.LimitsContext != nil && model.LimitsContext.Expr != "" && model.LimitsContext.From > 0 && model.LimitsContext.To > 0 {
|
||||||
|
// If a limits expression was provided, interpolate it and parse the time range
|
||||||
|
limitsConfig.Expr = interpolateVariables(model.LimitsContext.Expr, interval, timeRange, queryType, step)
|
||||||
|
limitsConfig.From = time.UnixMilli(model.LimitsContext.From)
|
||||||
|
limitsConfig.To = time.UnixMilli(model.LimitsContext.To)
|
||||||
|
}
|
||||||
|
return limitsConfig
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package loki
|
package loki
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -145,6 +146,74 @@ func TestParseQuery(t *testing.T) {
|
|||||||
require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr)
|
require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("parsing query model with invalid query limits context expr", func(t *testing.T) {
|
||||||
|
from := time.Now().Add(-3000 * time.Second)
|
||||||
|
fullFrom := time.Now().Add(-1 * time.Hour)
|
||||||
|
to := time.Now()
|
||||||
|
|
||||||
|
queryContext := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`
|
||||||
|
{
|
||||||
|
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
|
||||||
|
"format": "time_series",
|
||||||
|
"refId": "A",
|
||||||
|
"limitsContext": {"expr": "", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
|
||||||
|
}`,
|
||||||
|
),
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: from,
|
||||||
|
To: to,
|
||||||
|
},
|
||||||
|
Interval: time.Second * 15,
|
||||||
|
MaxDataPoints: 200,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
models, err := parseQuery(queryContext, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
|
||||||
|
// If the limits context expression is missing, we don't set any limits context
|
||||||
|
require.Equal(t, ``, models[0].LimitsContext.Expr)
|
||||||
|
require.Equal(t, time.Time{}, models[0].LimitsContext.To)
|
||||||
|
require.Equal(t, time.Time{}, models[0].LimitsContext.From)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("parsing query model with query limits context", func(t *testing.T) {
|
||||||
|
from := time.Now().Add(-3000 * time.Second)
|
||||||
|
fullFrom := time.Now().Add(-1 * time.Hour)
|
||||||
|
to := time.Now()
|
||||||
|
|
||||||
|
queryContext := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`
|
||||||
|
{
|
||||||
|
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
|
||||||
|
"format": "time_series",
|
||||||
|
"refId": "A",
|
||||||
|
"limitsContext": {"expr": "count_over_time({service_name=\"apache\"}[$__auto])", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
|
||||||
|
}`,
|
||||||
|
),
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: from,
|
||||||
|
To: to,
|
||||||
|
},
|
||||||
|
Interval: time.Second * 15,
|
||||||
|
MaxDataPoints: 200,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
models, err := parseQuery(queryContext, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, time.Second*15, models[0].Step)
|
||||||
|
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
|
||||||
|
require.Equal(t, `count_over_time({service_name="apache"}[15s])`, models[0].LimitsContext.Expr)
|
||||||
|
require.Equal(t, to.Truncate(time.Millisecond), models[0].LimitsContext.To)
|
||||||
|
require.Equal(t, fullFrom.Truncate(time.Millisecond), models[0].LimitsContext.From)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) {
|
t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) {
|
||||||
expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms"
|
expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms"
|
||||||
queryType := dataquery.LokiQueryTypeRange
|
queryType := dataquery.LokiQueryTypeRange
|
||||||
|
|||||||
@@ -11,6 +11,11 @@ import (
|
|||||||
type QueryType = dataquery.LokiQueryType
|
type QueryType = dataquery.LokiQueryType
|
||||||
type SupportingQueryType = dataquery.SupportingQueryType
|
type SupportingQueryType = dataquery.SupportingQueryType
|
||||||
type Direction = dataquery.LokiQueryDirection
|
type Direction = dataquery.LokiQueryDirection
|
||||||
|
type LimitsContext struct {
|
||||||
|
Expr string
|
||||||
|
From time.Time
|
||||||
|
To time.Time
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
QueryTypeRange = dataquery.LokiQueryTypeRange
|
QueryTypeRange = dataquery.LokiQueryTypeRange
|
||||||
@@ -42,4 +47,5 @@ type lokiQuery struct {
|
|||||||
RefID string
|
RefID string
|
||||||
SupportingQueryType SupportingQueryType
|
SupportingQueryType SupportingQueryType
|
||||||
Scopes []scope.ScopeFilter
|
Scopes []scope.ScopeFilter
|
||||||
|
LimitsContext LimitsContext
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -795,6 +795,7 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
|
|||||||
<PanelChrome
|
<PanelChrome
|
||||||
title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')}
|
title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')}
|
||||||
collapsible
|
collapsible
|
||||||
|
loadingState={logsVolumeData?.state}
|
||||||
collapsed={!logsVolumeEnabled}
|
collapsed={!logsVolumeEnabled}
|
||||||
onToggleCollapse={onToggleLogsVolumeCollapse}
|
onToggleCollapse={onToggleLogsVolumeCollapse}
|
||||||
>
|
>
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import { mergeLogsVolumeDataFrames, isLogsVolumeLimited, getLogsVolumeMaximumRan
|
|||||||
import { SupplementaryResultError } from '../SupplementaryResultError';
|
import { SupplementaryResultError } from '../SupplementaryResultError';
|
||||||
|
|
||||||
import { LogsVolumePanel } from './LogsVolumePanel';
|
import { LogsVolumePanel } from './LogsVolumePanel';
|
||||||
import { isTimeoutErrorResponse } from './utils/logsVolumeResponse';
|
import { isClientErrorResponse } from './utils/logsVolumeResponse';
|
||||||
|
|
||||||
type Props = {
|
type Props = {
|
||||||
logsVolumeData: DataQueryResponse | undefined;
|
logsVolumeData: DataQueryResponse | undefined;
|
||||||
@@ -92,7 +92,7 @@ export const LogsVolumePanelList = ({
|
|||||||
|
|
||||||
const canShowPartialData =
|
const canShowPartialData =
|
||||||
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
|
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
|
||||||
const timeoutError = isTimeoutErrorResponse(logsVolumeData);
|
const clientError = isClientErrorResponse(logsVolumeData);
|
||||||
|
|
||||||
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
|
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
|
||||||
const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to));
|
const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to));
|
||||||
@@ -123,7 +123,7 @@ export const LogsVolumePanelList = ({
|
|||||||
<Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans>
|
<Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans>
|
||||||
</span>
|
</span>
|
||||||
);
|
);
|
||||||
} else if (timeoutError && !canShowPartialData) {
|
} else if (clientError && !canShowPartialData) {
|
||||||
return (
|
return (
|
||||||
<SupplementaryResultError
|
<SupplementaryResultError
|
||||||
title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')}
|
title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')}
|
||||||
@@ -184,7 +184,7 @@ export const LogsVolumePanelList = ({
|
|||||||
|
|
||||||
return (
|
return (
|
||||||
<div className={styles.listContainer}>
|
<div className={styles.listContainer}>
|
||||||
{timeoutError && canShowPartialData && (
|
{clientError && canShowPartialData && (
|
||||||
<SupplementaryResultError
|
<SupplementaryResultError
|
||||||
title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')}
|
title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')}
|
||||||
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."
|
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { DataQueryResponse } from '@grafana/data';
|
import { DataQueryResponse } from '@grafana/data';
|
||||||
|
|
||||||
import { isTimeoutErrorResponse } from './logsVolumeResponse';
|
import { isClientErrorResponse } from './logsVolumeResponse';
|
||||||
|
|
||||||
const errorA =
|
const errorA =
|
||||||
'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)';
|
'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)';
|
||||||
@@ -16,7 +16,7 @@ describe('isTimeoutErrorResponse', () => {
|
|||||||
message: timeoutError,
|
message: timeoutError,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
expect(isClientErrorResponse(response)).toBe(true);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
test.each([errorA, errorB])(
|
test.each([errorA, errorB])(
|
||||||
@@ -33,7 +33,7 @@ describe('isTimeoutErrorResponse', () => {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
expect(isClientErrorResponse(response)).toBe(true);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
test.each([errorA, errorB])(
|
test.each([errorA, errorB])(
|
||||||
@@ -54,13 +54,13 @@ describe('isTimeoutErrorResponse', () => {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
expect(isClientErrorResponse(response)).toBe(true);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
test('does not report false positives', () => {
|
test('does not report false positives', () => {
|
||||||
const response: DataQueryResponse = {
|
const response: DataQueryResponse = {
|
||||||
data: [],
|
data: [],
|
||||||
};
|
};
|
||||||
expect(isTimeoutErrorResponse(response)).toBe(false);
|
expect(isClientErrorResponse(response)).toBe(false);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import { DataQueryError, DataQueryResponse } from '@grafana/data';
|
import { DataQueryError, DataQueryResponse } from '@grafana/data';
|
||||||
|
import { is4xxError } from '@grafana-plugins/loki/responseUtils';
|
||||||
|
|
||||||
// Currently we can only infer if an error response is a timeout or not.
|
// Currently we can only infer if an error response is a timeout or not.
|
||||||
export function isTimeoutErrorResponse(response: DataQueryResponse | undefined): boolean {
|
export function isClientErrorResponse(response: DataQueryResponse | undefined): boolean {
|
||||||
if (!response) {
|
if (!response) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -13,6 +14,8 @@ export function isTimeoutErrorResponse(response: DataQueryResponse | undefined):
|
|||||||
|
|
||||||
return errors.some((error: DataQueryError) => {
|
return errors.some((error: DataQueryError) => {
|
||||||
const message = `${error.message || error.data?.message}`?.toLowerCase();
|
const message = `${error.message || error.data?.message}`?.toLowerCase();
|
||||||
return message.includes('timeout');
|
return (
|
||||||
|
message.includes('timeout') || message?.includes('the query would read too many bytes') || is4xxError(response)
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,14 @@ composableKinds: DataQuery: {
|
|||||||
instant?: bool
|
instant?: bool
|
||||||
// Used to set step value for range queries.
|
// Used to set step value for range queries.
|
||||||
step?: string
|
step?: string
|
||||||
|
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||||
|
limitsContext?: #LimitsContext
|
||||||
|
|
||||||
|
#LimitsContext: {
|
||||||
|
expr: string
|
||||||
|
from: int64
|
||||||
|
to: int64
|
||||||
|
}
|
||||||
|
|
||||||
#QueryEditorMode: "code" | "builder" @cuetsy(kind="enum")
|
#QueryEditorMode: "code" | "builder" @cuetsy(kind="enum")
|
||||||
|
|
||||||
|
|||||||
@@ -48,6 +48,14 @@ export interface LokiDataQuery extends common.DataQuery {
|
|||||||
* Used to override the name of the series.
|
* Used to override the name of the series.
|
||||||
*/
|
*/
|
||||||
legendFormat?: string;
|
legendFormat?: string;
|
||||||
|
/**
|
||||||
|
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||||
|
*/
|
||||||
|
limitsContext?: {
|
||||||
|
expr: string;
|
||||||
|
from: number;
|
||||||
|
to: number;
|
||||||
|
};
|
||||||
/**
|
/**
|
||||||
* Used to limit the number of log rows returned.
|
* Used to limit the number of log rows returned.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -36,6 +36,11 @@ function getFrameKey(frame: DataFrame): string | undefined {
|
|||||||
return frame.refId ?? frame.name;
|
return frame.refId ?? frame.name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @todo test new response is error, current response is not
|
||||||
|
* @param currentResponse
|
||||||
|
* @param newResponse
|
||||||
|
*/
|
||||||
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
|
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
|
||||||
if (!currentResponse) {
|
if (!currentResponse) {
|
||||||
return cloneQueryResponse(newResponse);
|
return cloneQueryResponse(newResponse);
|
||||||
@@ -65,6 +70,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
|
|||||||
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
|
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
|
||||||
if (mergedErrors.length > 0) {
|
if (mergedErrors.length > 0) {
|
||||||
currentResponse.errors = mergedErrors;
|
currentResponse.errors = mergedErrors;
|
||||||
|
currentResponse.state = LoadingState.Error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the `.error` attribute is obsolete now,
|
// the `.error` attribute is obsolete now,
|
||||||
@@ -75,6 +81,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
|
|||||||
const mergedError = currentResponse.error ?? newResponse.error;
|
const mergedError = currentResponse.error ?? newResponse.error;
|
||||||
if (mergedError != null) {
|
if (mergedError != null) {
|
||||||
currentResponse.error = mergedError;
|
currentResponse.error = mergedError;
|
||||||
|
currentResponse.state = LoadingState.Error;
|
||||||
}
|
}
|
||||||
|
|
||||||
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
|
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
|
||||||
|
|||||||
@@ -222,6 +222,33 @@ export function getMockFrames() {
|
|||||||
length: 2,
|
length: 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const metricFrameAB: DataFrame = {
|
||||||
|
refId: 'A',
|
||||||
|
fields: [
|
||||||
|
{
|
||||||
|
name: 'Time',
|
||||||
|
type: FieldType.time,
|
||||||
|
config: {},
|
||||||
|
values: [1000000, 2000000, 3000000, 4000000],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'Value',
|
||||||
|
type: FieldType.number,
|
||||||
|
config: {},
|
||||||
|
values: [6, 7, 5, 4],
|
||||||
|
labels: {
|
||||||
|
level: 'debug',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
meta: {
|
||||||
|
notices: [],
|
||||||
|
type: DataFrameType.TimeSeriesMulti,
|
||||||
|
stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }],
|
||||||
|
},
|
||||||
|
length: 4,
|
||||||
|
};
|
||||||
|
|
||||||
const metricFrameC: DataFrame = {
|
const metricFrameC: DataFrame = {
|
||||||
refId: 'A',
|
refId: 'A',
|
||||||
name: 'some-time-series',
|
name: 'some-time-series',
|
||||||
@@ -305,6 +332,7 @@ export function getMockFrames() {
|
|||||||
metricFrameA,
|
metricFrameA,
|
||||||
metricFrameB,
|
metricFrameB,
|
||||||
metricFrameC,
|
metricFrameC,
|
||||||
|
metricFrameAB,
|
||||||
emptyFrame,
|
emptyFrame,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { of } from 'rxjs';
|
import { of } from 'rxjs';
|
||||||
|
|
||||||
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data';
|
import { DataQueryError, DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
|
||||||
import { config } from '@grafana/runtime';
|
import { config } from '@grafana/runtime';
|
||||||
|
|
||||||
import { LokiDatasource } from './datasource';
|
import { LokiDatasource } from './datasource';
|
||||||
@@ -16,6 +16,7 @@ jest.mock('uuid', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
|
const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
|
||||||
|
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
|
||||||
const originalErr = console.error;
|
const originalErr = console.error;
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
jest.spyOn(console, 'error').mockImplementation(() => {});
|
jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||||
@@ -26,22 +27,23 @@ beforeAll(() => {
|
|||||||
callback();
|
callback();
|
||||||
});
|
});
|
||||||
config.featureToggles.lokiShardSplitting = false;
|
config.featureToggles.lokiShardSplitting = false;
|
||||||
|
config.featureToggles.lokiQueryLimitsContext = true;
|
||||||
});
|
});
|
||||||
afterAll(() => {
|
afterAll(() => {
|
||||||
jest.mocked(global.setTimeout).mockReset();
|
jest.mocked(global.setTimeout).mockReset();
|
||||||
config.featureToggles.lokiShardSplitting = originalShardingFlagState;
|
config.featureToggles.lokiShardSplitting = originalShardingFlagState;
|
||||||
|
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
|
||||||
console.error = originalErr;
|
console.error = originalErr;
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('runSplitQuery()', () => {
|
describe('runSplitQuery()', () => {
|
||||||
let datasource: LokiDatasource;
|
let datasource: LokiDatasource;
|
||||||
|
const from = dateTime('2023-02-08T05:00:00.000Z');
|
||||||
|
const to = dateTime('2023-02-10T06:00:00.000Z');
|
||||||
const range = {
|
const range = {
|
||||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
from,
|
||||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
to,
|
||||||
raw: {
|
raw: { from, to },
|
||||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
|
||||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
|
const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
|
||||||
@@ -165,6 +167,19 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1676008800000,
|
_i: 1676008800000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: undefined,
|
||||||
|
limitsContext: {
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
from: from.valueOf(),
|
||||||
|
to: to.valueOf(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -183,6 +198,15 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1676005140000,
|
_i: 1676005140000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: undefined,
|
||||||
|
limitsContext: undefined,
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -201,6 +225,15 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1675918740000,
|
_i: 1675918740000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: undefined,
|
||||||
|
limitsContext: undefined,
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@@ -225,6 +258,19 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1676008800000,
|
_i: 1676008800000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: '10s',
|
||||||
|
limitsContext: {
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
from: from.valueOf(),
|
||||||
|
to: to.valueOf(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -243,6 +289,15 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1676005190000,
|
_i: 1676005190000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: '10s',
|
||||||
|
limitsContext: undefined,
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -261,21 +316,127 @@ describe('runSplitQuery()', () => {
|
|||||||
_i: 1675918790000,
|
_i: 1675918790000,
|
||||||
}),
|
}),
|
||||||
}),
|
}),
|
||||||
|
targets: [
|
||||||
|
{
|
||||||
|
expr: 'count_over_time({a="b"}[1m])',
|
||||||
|
legendFormat: undefined,
|
||||||
|
refId: 'A',
|
||||||
|
step: '10s',
|
||||||
|
limitsContext: undefined,
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test('Handles and reports errors', async () => {
|
test('Retries 5xx errors', async () => {
|
||||||
|
const { metricFrameA, metricFrameB, metricFrameAB } = getMockFrames();
|
||||||
|
const error: DataQueryError = {
|
||||||
|
message: 'OOPSIE',
|
||||||
|
status: 518,
|
||||||
|
};
|
||||||
|
const errResponse: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
data: [],
|
||||||
|
errors: [error],
|
||||||
|
key: 'uuid',
|
||||||
|
};
|
||||||
|
const response: DataQueryResponse = {
|
||||||
|
state: LoadingState.Done,
|
||||||
|
data: [metricFrameA],
|
||||||
|
key: 'uuid',
|
||||||
|
};
|
||||||
|
const response2: DataQueryResponse = {
|
||||||
|
state: LoadingState.Done,
|
||||||
|
data: [metricFrameB],
|
||||||
|
key: 'uuid',
|
||||||
|
};
|
||||||
jest
|
jest
|
||||||
.spyOn(datasource, 'runQuery')
|
.spyOn(datasource, 'runQuery')
|
||||||
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
|
.mockReturnValueOnce(of(errResponse))
|
||||||
|
.mockReturnValueOnce(of(response))
|
||||||
|
.mockReturnValueOnce(of(response2));
|
||||||
|
|
||||||
|
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||||
|
expect(values).toHaveLength(4);
|
||||||
|
expect(values[0]).toEqual(
|
||||||
|
expect.objectContaining({
|
||||||
|
data: [metricFrameAB],
|
||||||
|
key: 'uuid',
|
||||||
|
state: LoadingState.Done,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
|
||||||
|
});
|
||||||
|
test('Handles and reports 5xx error too many bytes', async () => {
|
||||||
|
const error: DataQueryError = {
|
||||||
|
message: 'the query would read too many bytes ...',
|
||||||
|
status: 500,
|
||||||
|
};
|
||||||
|
const response: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
data: [],
|
||||||
|
errors: [error],
|
||||||
|
};
|
||||||
|
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
|
||||||
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||||
expect(values).toHaveLength(1);
|
expect(values).toHaveLength(1);
|
||||||
expect(values[0]).toEqual(
|
expect(values[0]).toEqual(
|
||||||
expect.objectContaining({ error: { refId: 'A', message: 'Error' }, state: LoadingState.Streaming })
|
expect.objectContaining({
|
||||||
|
errors: [error],
|
||||||
|
state: LoadingState.Error,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
// Errors are not retried
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
test('Handles and reports 4xx errors', async () => {
|
||||||
|
const error: DataQueryError = {
|
||||||
|
message: 'BAD REQUEST',
|
||||||
|
status: 418,
|
||||||
|
};
|
||||||
|
const response: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
data: [],
|
||||||
|
errors: [error],
|
||||||
|
};
|
||||||
|
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
|
||||||
|
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||||
|
expect(values).toHaveLength(1);
|
||||||
|
expect(values[0]).toEqual(
|
||||||
|
expect.objectContaining({
|
||||||
|
errors: [error],
|
||||||
|
state: LoadingState.Error,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
// Errors are not retried
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Handles and reports errors (deprecated error)', async () => {
|
||||||
|
jest.spyOn(datasource, 'runQuery').mockReturnValue(
|
||||||
|
of({
|
||||||
|
state: LoadingState.Error,
|
||||||
|
error: { refId: 'A', message: 'the query would read too many bytes ...' },
|
||||||
|
data: [],
|
||||||
|
key: 'uuid',
|
||||||
|
})
|
||||||
|
);
|
||||||
|
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||||
|
expect(values).toHaveLength(1);
|
||||||
|
expect(values[0]).toEqual(
|
||||||
|
expect.objectContaining({
|
||||||
|
error: { refId: 'A', message: 'the query would read too many bytes ...' },
|
||||||
|
state: LoadingState.Error,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
// Errors are not retried
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Hidden and empty queries', () => {
|
describe('Hidden and empty queries', () => {
|
||||||
|
|||||||
@@ -8,16 +8,18 @@ import {
|
|||||||
DataQueryResponse,
|
DataQueryResponse,
|
||||||
DataTopic,
|
DataTopic,
|
||||||
dateTime,
|
dateTime,
|
||||||
rangeUtil,
|
|
||||||
TimeRange,
|
|
||||||
LoadingState,
|
LoadingState,
|
||||||
|
rangeUtil,
|
||||||
|
store,
|
||||||
|
TimeRange,
|
||||||
} from '@grafana/data';
|
} from '@grafana/data';
|
||||||
|
import { config } from '@grafana/runtime';
|
||||||
|
|
||||||
import { LokiDatasource } from './datasource';
|
import { LokiDatasource } from './datasource';
|
||||||
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
|
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
|
||||||
import { combineResponses } from './mergeResponses';
|
import { combineResponses } from './mergeResponses';
|
||||||
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
|
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
|
||||||
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
|
import { addQueryLimitsContext, isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
|
||||||
import { isRetriableError } from './responseUtils';
|
import { isRetriableError } from './responseUtils';
|
||||||
import { trackGroupedQueries } from './tracking';
|
import { trackGroupedQueries } from './tracking';
|
||||||
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
|
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
|
||||||
@@ -55,6 +57,10 @@ interface QuerySplittingOptions {
|
|||||||
* Do not retry failed queries.
|
* Do not retry failed queries.
|
||||||
*/
|
*/
|
||||||
disableRetry?: boolean;
|
disableRetry?: boolean;
|
||||||
|
/**
|
||||||
|
* The current index of all query attempts
|
||||||
|
*/
|
||||||
|
shardQueryIndex?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -85,6 +91,25 @@ export function adjustTargetsFromResponseState(targets: LokiQuery[], response: D
|
|||||||
})
|
})
|
||||||
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
|
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const addLimitsToSplitRequests = (splitQueryIndex: number, shardQueryIndex: number, requests: LokiGroupedRequest[]) => {
|
||||||
|
// requests has already been mutated
|
||||||
|
return requests.map((r) => ({
|
||||||
|
...r,
|
||||||
|
request: {
|
||||||
|
...r.request,
|
||||||
|
targets: r.request.targets.map((t) => {
|
||||||
|
// @todo if we retry the first request, we will strip out the query limits context
|
||||||
|
if (splitQueryIndex === 0 && shardQueryIndex === 0) {
|
||||||
|
// Don't pull from request if it has already been added by `addLimitsToShardGroups`
|
||||||
|
return t.limitsContext === undefined ? addQueryLimitsContext(t, r.request) : t;
|
||||||
|
}
|
||||||
|
return { ...t, limitsContext: undefined };
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
|
||||||
export function runSplitGroupedQueries(
|
export function runSplitGroupedQueries(
|
||||||
datasource: LokiDatasource,
|
datasource: LokiDatasource,
|
||||||
requests: LokiGroupedRequest[],
|
requests: LokiGroupedRequest[],
|
||||||
@@ -99,8 +124,15 @@ export function runSplitGroupedQueries(
|
|||||||
let subquerySubscription: Subscription | null = null;
|
let subquerySubscription: Subscription | null = null;
|
||||||
let retriesMap = new Map<string, number>();
|
let retriesMap = new Map<string, number>();
|
||||||
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
let splitQueryIndex = 0;
|
||||||
|
const shardQueryIndex = options.shardQueryIndex ?? 0;
|
||||||
|
|
||||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
|
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
|
||||||
|
if (config.featureToggles.lokiQueryLimitsContext) {
|
||||||
|
requests = addLimitsToSplitRequests(splitQueryIndex, shardQueryIndex, requests);
|
||||||
|
}
|
||||||
|
|
||||||
|
splitQueryIndex++;
|
||||||
let retrying = false;
|
let retrying = false;
|
||||||
|
|
||||||
if (subquerySubscription != null) {
|
if (subquerySubscription != null) {
|
||||||
@@ -114,7 +146,10 @@ export function runSplitGroupedQueries(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const done = () => {
|
const done = () => {
|
||||||
mergedResponse.state = LoadingState.Done;
|
if (mergedResponse.state !== LoadingState.Error) {
|
||||||
|
mergedResponse.state = LoadingState.Done;
|
||||||
|
}
|
||||||
|
|
||||||
subscriber.next(mergedResponse);
|
subscriber.next(mergedResponse);
|
||||||
subscriber.complete();
|
subscriber.complete();
|
||||||
};
|
};
|
||||||
@@ -189,6 +224,10 @@ export function runSplitGroupedQueries(
|
|||||||
if (!options.skipPartialUpdates) {
|
if (!options.skipPartialUpdates) {
|
||||||
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
|
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mergedResponse.state === LoadingState.Error) {
|
||||||
|
done();
|
||||||
|
}
|
||||||
},
|
},
|
||||||
complete: () => {
|
complete: () => {
|
||||||
if (retrying) {
|
if (retrying) {
|
||||||
@@ -301,7 +340,9 @@ export function runSplitQuery(
|
|||||||
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
|
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
|
||||||
|
|
||||||
request.queryGroupId = uuidv4();
|
request.queryGroupId = uuidv4();
|
||||||
const oneDayMs = 24 * 60 * 60 * 1000;
|
// Allow custom split durations for debugging, e.g. `localStorage.setItem('grafana.loki.querySplitInterval', 24 * 60 * 1000) // 1 hour`
|
||||||
|
const debugSplitDuration = parseInt(store.get('grafana.loki.querySplitInterval'), 10);
|
||||||
|
const oneDayMs = debugSplitDuration || 24 * 60 * 60 * 1000;
|
||||||
const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
|
const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
|
||||||
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
|
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { SyntaxNode } from '@lezer/common';
|
import { SyntaxNode } from '@lezer/common';
|
||||||
import { escapeRegExp } from 'lodash';
|
import { escapeRegExp } from 'lodash';
|
||||||
|
|
||||||
|
import { DataQueryRequest } from '@grafana/data';
|
||||||
import {
|
import {
|
||||||
parser,
|
parser,
|
||||||
LineFilter,
|
LineFilter,
|
||||||
@@ -310,6 +311,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
|
|||||||
export function requestSupportsSplitting(allQueries: LokiQuery[]) {
|
export function requestSupportsSplitting(allQueries: LokiQuery[]) {
|
||||||
const queries = allQueries
|
const queries = allQueries
|
||||||
.filter((query) => !query.hide)
|
.filter((query) => !query.hide)
|
||||||
|
.filter((query) => query.queryType !== LokiQueryType.Instant)
|
||||||
.filter((query) => !query.refId.includes('do-not-chunk'))
|
.filter((query) => !query.refId.includes('do-not-chunk'))
|
||||||
.filter((query) => query.expr);
|
.filter((query) => query.expr);
|
||||||
|
|
||||||
@@ -425,3 +427,21 @@ export const getSelectorForShardValues = (query: string) => {
|
|||||||
}
|
}
|
||||||
return '';
|
return '';
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds query plan to shard/split queries
|
||||||
|
* Must be called after interpolation step!
|
||||||
|
*
|
||||||
|
* @param lokiQuery
|
||||||
|
* @param request
|
||||||
|
*/
|
||||||
|
export const addQueryLimitsContext = (lokiQuery: LokiQuery, request: DataQueryRequest<LokiQuery>) => {
|
||||||
|
return {
|
||||||
|
...lokiQuery,
|
||||||
|
limitsContext: {
|
||||||
|
expr: lokiQuery.expr,
|
||||||
|
from: request.range.from.toDate().getTime(),
|
||||||
|
to: request.range.to.toDate().getTime(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
|
import { DataFrame, DataQueryError, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
|
||||||
|
|
||||||
import { isBytesString, processLabels } from './languageUtils';
|
import { isBytesString, processLabels } from './languageUtils';
|
||||||
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
|
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
|
||||||
@@ -134,13 +134,44 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
|
|||||||
|
|
||||||
export function isRetriableError(errorResponse: DataQueryResponse) {
|
export function isRetriableError(errorResponse: DataQueryResponse) {
|
||||||
const message = errorResponse.errors
|
const message = errorResponse.errors
|
||||||
? (errorResponse.errors[0].message ?? '').toLowerCase()
|
? errorResponse.errors
|
||||||
: (errorResponse.error?.message ?? '');
|
.map((err) => err.message ?? '')
|
||||||
|
.join()
|
||||||
|
.toLowerCase()
|
||||||
|
: (errorResponse.error?.message ?? '').toLowerCase();
|
||||||
|
|
||||||
|
// max_query_bytes_read exceeded, currently 500 when should be 4xx
|
||||||
|
if (message.includes('the query would read too many bytes') || is4xxError(errorResponse)) {
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
if (message.includes('timeout')) {
|
if (message.includes('timeout')) {
|
||||||
return true;
|
return true;
|
||||||
} else if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
|
}
|
||||||
|
if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
|
||||||
// Error response but we're receiving data, continue querying.
|
// Error response but we're receiving data, continue querying.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (is5xxError(errorResponse)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
throw new Error(message);
|
throw new Error(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function is4xxError(errorResponse: DataQueryResponse) {
|
||||||
|
/**
|
||||||
|
* Before https://github.com/grafana/grafana/pull/114201 the Loki data source always returns a 500 for every error response type in the response body, and this is what Grafana uses to populate the DataQueryError
|
||||||
|
* Since the frontend and backend are being deployed separately now we might want to continue to check error messages for a bit until we are sure that the correct status code is always set in the data query response.
|
||||||
|
*
|
||||||
|
* @param errorResponse
|
||||||
|
*/
|
||||||
|
return isHttpErrorType(errorResponse, '4');
|
||||||
|
}
|
||||||
|
|
||||||
|
export function is5xxError(errorResponse: DataQueryResponse) {
|
||||||
|
return isHttpErrorType(errorResponse, '5');
|
||||||
|
}
|
||||||
|
|
||||||
|
function isHttpErrorType(errorResponse: DataQueryResponse, responseType: '2' | '3' | '4' | '5') {
|
||||||
|
const isErrOfType = (err: DataQueryError) => err.status && Array.from(err.status?.toString())[0] === responseType;
|
||||||
|
return (errorResponse.error && isErrOfType(errorResponse.error)) || errorResponse.errors?.some(isErrOfType);
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { of } from 'rxjs';
|
import { of } from 'rxjs';
|
||||||
|
|
||||||
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
|
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
|
||||||
|
import { config } from '@grafana/runtime';
|
||||||
|
|
||||||
import { LokiDatasource } from './datasource';
|
import { LokiDatasource } from './datasource';
|
||||||
import { createLokiDatasource } from './mocks/datasource';
|
import { createLokiDatasource } from './mocks/datasource';
|
||||||
@@ -12,6 +13,8 @@ jest.mock('uuid', () => ({
|
|||||||
v4: jest.fn().mockReturnValue('uuid'),
|
v4: jest.fn().mockReturnValue('uuid'),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
|
||||||
|
|
||||||
const originalLog = console.log;
|
const originalLog = console.log;
|
||||||
const originalWarn = console.warn;
|
const originalWarn = console.warn;
|
||||||
const originalErr = console.error;
|
const originalErr = console.error;
|
||||||
@@ -20,20 +23,26 @@ beforeEach(() => {
|
|||||||
jest.spyOn(console, 'warn').mockImplementation(() => {});
|
jest.spyOn(console, 'warn').mockImplementation(() => {});
|
||||||
jest.spyOn(console, 'error').mockImplementation(() => {});
|
jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||||
});
|
});
|
||||||
|
beforeAll(() => {
|
||||||
|
config.featureToggles.lokiQueryLimitsContext = true;
|
||||||
|
});
|
||||||
afterAll(() => {
|
afterAll(() => {
|
||||||
console.log = originalLog;
|
console.log = originalLog;
|
||||||
console.warn = originalWarn;
|
console.warn = originalWarn;
|
||||||
console.error = originalErr;
|
console.error = originalErr;
|
||||||
|
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('runShardSplitQuery()', () => {
|
describe('runShardSplitQuery()', () => {
|
||||||
let datasource: LokiDatasource;
|
let datasource: LokiDatasource;
|
||||||
|
const from = dateTime('2023-02-08T04:00:00.000Z');
|
||||||
|
const to = dateTime('2023-02-08T11:00:00.000Z');
|
||||||
const range = {
|
const range = {
|
||||||
from: dateTime('2023-02-08T04:00:00.000Z'),
|
from,
|
||||||
to: dateTime('2023-02-08T11:00:00.000Z'),
|
to,
|
||||||
raw: {
|
raw: {
|
||||||
from: dateTime('2023-02-08T04:00:00.000Z'),
|
from,
|
||||||
to: dateTime('2023-02-08T11:00:00.000Z'),
|
to,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -139,6 +148,11 @@ describe('runShardSplitQuery()', () => {
|
|||||||
targets: [
|
targets: [
|
||||||
{
|
{
|
||||||
expr: '{a="b", __stream_shard__=~"20|10"}',
|
expr: '{a="b", __stream_shard__=~"20|10"}',
|
||||||
|
limitsContext: {
|
||||||
|
expr: `{a="b"}`,
|
||||||
|
from: from.valueOf(),
|
||||||
|
to: to.valueOf(),
|
||||||
|
},
|
||||||
refId: 'A',
|
refId: 'A',
|
||||||
direction: LokiQueryDirection.Scan,
|
direction: LokiQueryDirection.Scan,
|
||||||
},
|
},
|
||||||
@@ -209,6 +223,11 @@ describe('runShardSplitQuery()', () => {
|
|||||||
targets: [
|
targets: [
|
||||||
{
|
{
|
||||||
expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}',
|
expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}',
|
||||||
|
limitsContext: {
|
||||||
|
expr: `{service_name="test", filter="true"}`,
|
||||||
|
from: from.valueOf(),
|
||||||
|
to: to.valueOf(),
|
||||||
|
},
|
||||||
refId: 'A',
|
refId: 'A',
|
||||||
direction: LokiQueryDirection.Scan,
|
direction: LokiQueryDirection.Scan,
|
||||||
},
|
},
|
||||||
@@ -241,28 +260,113 @@ describe('runShardSplitQuery()', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test('Failed requests have loading state Error', async () => {
|
describe('Errors', () => {
|
||||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
beforeEach(() => {
|
||||||
jest
|
const querySplittingRange = {
|
||||||
.spyOn(datasource, 'runQuery')
|
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||||
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] }));
|
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
raw: {
|
||||||
expect(response[0].state).toBe(LoadingState.Error);
|
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||||
|
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
request = createRequest([{ expr: '$SELECTOR', refId: 'A', direction: LokiQueryDirection.Scan }], {
|
||||||
|
range: querySplittingRange,
|
||||||
|
});
|
||||||
|
// @ts-expect-error
|
||||||
|
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
||||||
|
callback();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
|
||||||
|
|
||||||
test('Does not retry on other errors', async () => {
|
test('Failed 4xx responses have loading state Error', async () => {
|
||||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '12', '5']);
|
||||||
jest
|
jest
|
||||||
.spyOn(datasource, 'runQuery')
|
.spyOn(datasource, 'runQuery')
|
||||||
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
|
.mockReturnValue(
|
||||||
// @ts-expect-error
|
of({ state: LoadingState.Error, error: { refId: 'A', message: 'client error', status: 400 }, data: [] })
|
||||||
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
);
|
||||||
callback();
|
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||||
});
|
expect(response[0].state).toBe(LoadingState.Error);
|
||||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
});
|
||||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('Max query bytes errors are not retried', async () => {
|
||||||
|
const errResp: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
errors: [{ refId: 'A', message: 'the query would read too many bytes ...', status: 500 }],
|
||||||
|
data: [],
|
||||||
|
};
|
||||||
|
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||||
|
jest
|
||||||
|
.spyOn(datasource, 'runQuery')
|
||||||
|
.mockReturnValueOnce(of(errResp))
|
||||||
|
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||||
|
|
||||||
|
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||||
|
expect(response[0].state).toBe(LoadingState.Error);
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Failed 5xx requests are retried', async () => {
|
||||||
|
const errResp: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
errors: [{ refId: 'A', message: 'parse error', status: 500 }],
|
||||||
|
data: [],
|
||||||
|
};
|
||||||
|
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||||
|
jest
|
||||||
|
.spyOn(datasource, 'runQuery')
|
||||||
|
.mockReturnValueOnce(of(errResp))
|
||||||
|
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||||
|
|
||||||
|
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||||
|
expect(response[0].state).toBe(LoadingState.Done);
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Failed 5xx requests are retried (dep)', async () => {
|
||||||
|
const errResp: DataQueryResponse = {
|
||||||
|
state: LoadingState.Error,
|
||||||
|
error: { refId: 'A', message: 'parse error', status: 500 },
|
||||||
|
data: [],
|
||||||
|
};
|
||||||
|
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||||
|
jest
|
||||||
|
.spyOn(datasource, 'runQuery')
|
||||||
|
.mockReturnValueOnce(of(errResp))
|
||||||
|
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||||
|
|
||||||
|
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||||
|
expect(response[0].state).toBe(LoadingState.Done);
|
||||||
|
});
|
||||||
|
|
||||||
|
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Does not retry on other errors', async () => {
|
||||||
|
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
||||||
|
jest
|
||||||
|
.spyOn(datasource, 'runQuery')
|
||||||
|
.mockReturnValueOnce(
|
||||||
|
of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })
|
||||||
|
);
|
||||||
|
// @ts-expect-error
|
||||||
|
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
||||||
|
callback();
|
||||||
|
});
|
||||||
|
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||||
|
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test('Adjusts the group size based on errors and execution time', async () => {
|
test('Adjusts the group size based on errors and execution time', async () => {
|
||||||
@@ -425,6 +529,11 @@ describe('runShardSplitQuery()', () => {
|
|||||||
targets: [
|
targets: [
|
||||||
{
|
{
|
||||||
expr: '{a="b", __stream_shard__=~"20|10|9"}',
|
expr: '{a="b", __stream_shard__=~"20|10|9"}',
|
||||||
|
limitsContext: {
|
||||||
|
expr: `{a="b"}`,
|
||||||
|
from: from.valueOf(),
|
||||||
|
to: to.valueOf(),
|
||||||
|
},
|
||||||
refId: 'A',
|
refId: 'A',
|
||||||
direction: LokiQueryDirection.Scan,
|
direction: LokiQueryDirection.Scan,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -2,15 +2,20 @@ import { groupBy, partition } from 'lodash';
|
|||||||
import { Observable, Subscriber, Subscription } from 'rxjs';
|
import { Observable, Subscriber, Subscription } from 'rxjs';
|
||||||
import { v4 as uuidv4 } from 'uuid';
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
|
|
||||||
import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
|
import { DataQueryRequest, DataQueryResponse, LoadingState, QueryResultMetaStat } from '@grafana/data';
|
||||||
|
import { config } from '@grafana/runtime';
|
||||||
|
|
||||||
import { LokiDatasource } from './datasource';
|
import { LokiDatasource } from './datasource';
|
||||||
import { combineResponses, replaceResponses } from './mergeResponses';
|
import { combineResponses, replaceResponses } from './mergeResponses';
|
||||||
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
|
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
|
||||||
import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils';
|
import {
|
||||||
|
addQueryLimitsContext,
|
||||||
|
getSelectorForShardValues,
|
||||||
|
interpolateShardingSelector,
|
||||||
|
requestSupportsSharding,
|
||||||
|
} from './queryUtils';
|
||||||
import { isRetriableError } from './responseUtils';
|
import { isRetriableError } from './responseUtils';
|
||||||
import { LokiQuery } from './types';
|
import { LokiQuery } from './types';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query splitting by stream shards.
|
* Query splitting by stream shards.
|
||||||
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
|
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
|
||||||
@@ -54,6 +59,19 @@ export function runShardSplitQuery(datasource: LokiDatasource, request: DataQuer
|
|||||||
return splitQueriesByStreamShard(datasource, request, queries);
|
return splitQueriesByStreamShard(datasource, request, queries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const addLimitsToShardGroups = (
|
||||||
|
queryIndex: number,
|
||||||
|
groups: ShardedQueryGroup[],
|
||||||
|
request: DataQueryRequest<LokiQuery>
|
||||||
|
) => {
|
||||||
|
return groups.map((g) => ({
|
||||||
|
...g,
|
||||||
|
targets: g.targets.map((t) => {
|
||||||
|
return queryIndex === 0 ? addQueryLimitsContext(t, request) : { ...t, limitsContext: undefined };
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
|
||||||
function splitQueriesByStreamShard(
|
function splitQueriesByStreamShard(
|
||||||
datasource: LokiDatasource,
|
datasource: LokiDatasource,
|
||||||
request: DataQueryRequest<LokiQuery>,
|
request: DataQueryRequest<LokiQuery>,
|
||||||
@@ -64,8 +82,13 @@ function splitQueriesByStreamShard(
|
|||||||
let subquerySubscription: Subscription | null = null;
|
let subquerySubscription: Subscription | null = null;
|
||||||
let retriesMap = new Map<string, number>();
|
let retriesMap = new Map<string, number>();
|
||||||
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
let queryIndex = 0;
|
||||||
|
|
||||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
|
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
|
||||||
|
if (config.featureToggles.lokiQueryLimitsContext) {
|
||||||
|
groups = addLimitsToShardGroups(queryIndex, groups, request);
|
||||||
|
}
|
||||||
|
queryIndex++;
|
||||||
let nextGroupSize = groups[group].groupSize;
|
let nextGroupSize = groups[group].groupSize;
|
||||||
const { shards, groupSize, cycle } = groups[group];
|
const { shards, groupSize, cycle } = groups[group];
|
||||||
let retrying = false;
|
let retrying = false;
|
||||||
@@ -164,6 +187,7 @@ function splitQueriesByStreamShard(
|
|||||||
subquerySubscription = runSplitQuery(datasource, subRequest, {
|
subquerySubscription = runSplitQuery(datasource, subRequest, {
|
||||||
skipPartialUpdates: true,
|
skipPartialUpdates: true,
|
||||||
disableRetry: true,
|
disableRetry: true,
|
||||||
|
shardQueryIndex: queryIndex - 1,
|
||||||
}).subscribe({
|
}).subscribe({
|
||||||
next: (partialResponse: DataQueryResponse) => {
|
next: (partialResponse: DataQueryResponse) => {
|
||||||
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
|
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user