mirror of
https://github.com/grafana/grafana.git
synced 2026-01-13 07:24:10 +08:00
Compare commits
2 Commits
main
...
increase-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
183aac688a | ||
|
|
c9a99fd1ca |
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/flagext"
|
||||
@@ -15,11 +16,14 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
@@ -111,9 +115,18 @@ func newClientPool(clientCfg grpcclient.Config, log log.Logger, reg prometheus.R
|
||||
Help: "Time spent executing requests to resource server.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
|
||||
}, []string{"operation", "status_code"})
|
||||
factoryRequestRetries := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "resource_server_client_request_retries_total",
|
||||
Help: "Total number of retries for requests to the resource server.",
|
||||
}, []string{"operation"})
|
||||
|
||||
factory := ringclient.PoolInstFunc(func(inst ring.InstanceDesc) (ringclient.PoolClient, error) {
|
||||
unaryInterceptors, streamInterceptors := grpcclient.Instrument(factoryRequestDuration)
|
||||
|
||||
// Add retry interceptors for transient connection issues
|
||||
unaryInterceptors = append(unaryInterceptors, ringClientRetryInterceptor())
|
||||
unaryInterceptors = append(unaryInterceptors, ringClientRetryInstrument(factoryRequestRetries))
|
||||
|
||||
opts, err := clientCfg.DialOption(unaryInterceptors, streamInterceptors, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -135,3 +148,26 @@ func newClientPool(clientCfg grpcclient.Config, log log.Logger, reg prometheus.R
|
||||
|
||||
return ringclient.NewPool(resource.RingName, poolCfg, nil, factory, clientsCount, log)
|
||||
}
|
||||
|
||||
// ringClientRetryInterceptor creates an interceptor to perform retries for unary methods.
|
||||
// It retries on ResourceExhausted and Unavailable codes, which are typical for
|
||||
// transient connection issues and rate limiting.
|
||||
func ringClientRetryInterceptor() grpc.UnaryClientInterceptor {
|
||||
return grpc_retry.UnaryClientInterceptor(
|
||||
grpc_retry.WithMax(3),
|
||||
grpc_retry.WithBackoff(grpc_retry.BackoffExponentialWithJitter(time.Second, 0.5)),
|
||||
grpc_retry.WithCodes(codes.ResourceExhausted, codes.Unavailable),
|
||||
)
|
||||
}
|
||||
|
||||
// ringClientRetryInstrument creates an interceptor to count retry attempts for metrics.
|
||||
func ringClientRetryInstrument(metric *prometheus.CounterVec) grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
// We can tell if a call is a retry by checking the retry attempt metadata.
|
||||
attempt, err := strconv.Atoi(metautils.ExtractOutgoing(ctx).Get(grpc_retry.AttemptMetadataKey))
|
||||
if err == nil && attempt > 0 {
|
||||
metric.WithLabelValues(method).Inc()
|
||||
}
|
||||
return invoker(ctx, method, req, resp, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,11 +11,15 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||
|
||||
apiserverrest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
secret "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
||||
@@ -232,19 +236,16 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi
|
||||
if o.StorageType != StorageTypeUnifiedGrpc {
|
||||
return nil
|
||||
}
|
||||
conn, err := grpc.NewClient(o.Address,
|
||||
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
|
||||
grpcOpts := o.buildGrpcDialOptions()
|
||||
|
||||
conn, err := grpc.NewClient(o.Address, grpcOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var indexConn *grpc.ClientConn
|
||||
if o.SearchServerAddress != "" {
|
||||
indexConn, err = grpc.NewClient(o.SearchServerAddress,
|
||||
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
indexConn, err = grpc.NewClient(o.SearchServerAddress, grpcOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -293,3 +294,34 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi
|
||||
serverConfig.RESTOptionsGetter = getter
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildGrpcDialOptions creates gRPC dial options with resilience mechanisms:
|
||||
// - Round-robin load balancing with client-side health checking
|
||||
// - Retry interceptor for transient connection issues
|
||||
// - Keepalive for long-lived connections
|
||||
func (o *StorageOptions) buildGrpcDialOptions() []grpc.DialOption {
|
||||
// Retry interceptor for transient connection issues (codes.Unavailable includes connection refused)
|
||||
retryInterceptor := grpc_retry.UnaryClientInterceptor(
|
||||
grpc_retry.WithMax(3),
|
||||
grpc_retry.WithBackoff(grpc_retry.BackoffExponentialWithJitter(time.Second, 0.5)),
|
||||
grpc_retry.WithCodes(codes.ResourceExhausted, codes.Unavailable),
|
||||
)
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithChainUnaryInterceptor(retryInterceptor),
|
||||
// Use round_robin with health checking so unhealthy backends are excluded
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin","healthCheckConfig":{"serviceName":""}}`),
|
||||
}
|
||||
|
||||
if o.GrpcClientKeepaliveTime > 0 {
|
||||
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: o.GrpcClientKeepaliveTime,
|
||||
Timeout: 10 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
@@ -288,8 +288,9 @@ func grpcConn(address string, metrics *clientMetrics, clientKeepaliveTime time.D
|
||||
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
|
||||
// Use round_robin to balances requests more evenly over the available Storage server.
|
||||
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
|
||||
// Use round_robin to balance requests more evenly over the available Storage server.
|
||||
// Enable health checking so round_robin excludes unhealthy backends (e.g., terminated pods).
|
||||
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin","healthCheckConfig":{"serviceName":""}}`))
|
||||
|
||||
// Disable looking up service config from TXT DNS records.
|
||||
// This reduces the number of requests made to the DNS servers.
|
||||
|
||||
Reference in New Issue
Block a user