Compare commits

...

2 Commits

Author SHA1 Message Date
Mustafa Sencer Özcan
183aac688a fix: resilience 2026-01-12 15:05:17 +01:00
Mustafa Sencer Özcan
c9a99fd1ca fix: reliability 2026-01-12 15:05:16 +01:00
3 changed files with 79 additions and 10 deletions

View File

@@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"strconv"
"time"
"github.com/grafana/dskit/flagext"
@@ -15,11 +16,14 @@ import (
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
)
@@ -111,9 +115,18 @@ func newClientPool(clientCfg grpcclient.Config, log log.Logger, reg prometheus.R
Help: "Time spent executing requests to resource server.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}, []string{"operation", "status_code"})
factoryRequestRetries := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "resource_server_client_request_retries_total",
Help: "Total number of retries for requests to the resource server.",
}, []string{"operation"})
factory := ringclient.PoolInstFunc(func(inst ring.InstanceDesc) (ringclient.PoolClient, error) {
unaryInterceptors, streamInterceptors := grpcclient.Instrument(factoryRequestDuration)
// Add retry interceptors for transient connection issues
unaryInterceptors = append(unaryInterceptors, ringClientRetryInterceptor())
unaryInterceptors = append(unaryInterceptors, ringClientRetryInstrument(factoryRequestRetries))
opts, err := clientCfg.DialOption(unaryInterceptors, streamInterceptors, nil)
if err != nil {
return nil, err
@@ -135,3 +148,26 @@ func newClientPool(clientCfg grpcclient.Config, log log.Logger, reg prometheus.R
return ringclient.NewPool(resource.RingName, poolCfg, nil, factory, clientsCount, log)
}
// ringClientRetryInterceptor creates an interceptor to perform retries for unary methods.
// It retries on ResourceExhausted and Unavailable codes, which are typical for
// transient connection issues and rate limiting.
func ringClientRetryInterceptor() grpc.UnaryClientInterceptor {
return grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponentialWithJitter(time.Second, 0.5)),
grpc_retry.WithCodes(codes.ResourceExhausted, codes.Unavailable),
)
}
// ringClientRetryInstrument creates an interceptor to count retry attempts for metrics.
func ringClientRetryInstrument(metric *prometheus.CounterVec) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// We can tell if a call is a retry by checking the retry attempt metadata.
attempt, err := strconv.Atoi(metautils.ExtractOutgoing(ctx).Get(grpc_retry.AttemptMetadataKey))
if err == nil && attempt > 0 {
metric.WithLabelValues(method).Inc()
}
return invoker(ctx, method, req, resp, cc, opts...)
}
}

View File

@@ -11,11 +11,15 @@ import (
"github.com/spf13/pflag"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/rest"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
apiserverrest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/tracing"
secret "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
@@ -232,19 +236,16 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi
if o.StorageType != StorageTypeUnifiedGrpc {
return nil
}
conn, err := grpc.NewClient(o.Address,
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
grpcOpts := o.buildGrpcDialOptions()
conn, err := grpc.NewClient(o.Address, grpcOpts...)
if err != nil {
return err
}
var indexConn *grpc.ClientConn
if o.SearchServerAddress != "" {
indexConn, err = grpc.NewClient(o.SearchServerAddress,
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
indexConn, err = grpc.NewClient(o.SearchServerAddress, grpcOpts...)
if err != nil {
return err
}
@@ -293,3 +294,34 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi
serverConfig.RESTOptionsGetter = getter
return nil
}
// buildGrpcDialOptions creates gRPC dial options with resilience mechanisms:
// - Round-robin load balancing with client-side health checking
// - Retry interceptor for transient connection issues
// - Keepalive for long-lived connections
func (o *StorageOptions) buildGrpcDialOptions() []grpc.DialOption {
// Retry interceptor for transient connection issues (codes.Unavailable includes connection refused)
retryInterceptor := grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponentialWithJitter(time.Second, 0.5)),
grpc_retry.WithCodes(codes.ResourceExhausted, codes.Unavailable),
)
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(retryInterceptor),
// Use round_robin with health checking so unhealthy backends are excluded
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin","healthCheckConfig":{"serviceName":""}}`),
}
if o.GrpcClientKeepaliveTime > 0 {
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: o.GrpcClientKeepaliveTime,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))
}
return opts
}

View File

@@ -288,8 +288,9 @@ func grpcConn(address string, metrics *clientMetrics, clientKeepaliveTime time.D
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// Use round_robin to balances requests more evenly over the available Storage server.
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
// Use round_robin to balance requests more evenly over the available Storage server.
// Enable health checking so round_robin excludes unhealthy backends (e.g., terminated pods).
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin","healthCheckConfig":{"serviceName":""}}`))
// Disable looking up service config from TXT DNS records.
// This reduces the number of requests made to the DNS servers.