Open-Telemetry的第三方软件包合集 包括了多个社区中常用库的OpenTelemetry支持。随着 OpenTelemetry的不断迭代,相信整个链路追踪的生态也会越发完善。

gRPC的链路追踪

gRPC 的 trace 数据,推荐使用 otelgrpc

安装依赖。

go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

这里使用一个简单的 hello 程序,其proto文件内容如下。

syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本

package pb; // 包名


// 定义服务
service Greeter {
    // SayHello 方法
    rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// 请求消息
message HelloRequest {
    string name = 1;
}

// 响应消息
message HelloResponse {
    string reply = 1;
}

Server端

在创建 gRPC Server 时,按如下方式设置 StatsHandlerotelgrpc.NewServerHandler()

import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

// ...

s := grpc.NewServer(
  grpc.StatsHandler(otelgrpc.NewServerHandler()), // 设置 StatsHandler
)

完整代码如下:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"time"

	"hello_server/pb"

	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)

const (
	serviceName       = "gRPC-Jaeger-Demo"
	jaegerRPCEndpoint = "127.0.0.1:4317" // Jaeger RPC端口
)

var tracer = otel.Tracer("grpc-example")

// newJaegerTraceProvider 创建一个 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
	// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
	exp, err := otlptracegrpc.New(ctx,
		otlptracegrpc.WithEndpoint(jaegerRPCEndpoint),
		otlptracegrpc.WithInsecure())
	if err != nil {
		return nil, err
	}
	res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
	if err != nil {
		return nil, err
	}
	traceProvider := sdktrace.NewTracerProvider(
		sdktrace.WithResource(res),
		sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样
		sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
	)
	return traceProvider, nil
}

// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
	tp, err := newJaegerTraceProvider(ctx)
	if err != nil {
		return nil, err
	}

	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
	)
	return tp, nil
}

type server struct {
	pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
	md, _ := metadata.FromIncomingContext(ctx)
	_, span := tracer.Start(ctx, "SayHello",
		trace.WithAttributes(
			attribute.String("name", in.GetName()),
			attribute.StringSlice("client-id", md.Get("client-id")),
			attribute.StringSlice("user-id", md.Get("user-id")),
		),
	)
	defer span.End()
	return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
}

func main() {
	ctx := context.Background()
	tp, err := initTracer(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	// 监听本地的8972端口
	lis, err := net.Listen("tcp", ":8972")
	if err != nil {
		fmt.Printf("failed to listen: %v", err)
		return
	}

	// 创建gRPC服务器
	s := grpc.NewServer(
		grpc.StatsHandler(otelgrpc.NewServerHandler()), // 设置 StatsHandler
	)

	pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务端注册服务
	// 启动服务
	err = s.Serve(lis)
	if err != nil {
		fmt.Printf("failed to serve: %v", err)
		return
	}
}

Client端

在建立连接时,指定 StatsHandlerotelgrpc.NewClientHandler()

import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

// ...

conn, err := grpc.NewClient(
  *addr,
  grpc.WithTransportCredentials(insecure.NewCredentials()),
  grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 设置 StatsHandler
)

完整代码:

package main

import (
	"context"
	"flag"
	"log"
	"time"

	"hello_client/pb"

	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
)

const (
	serviceName    = "gRPC-Jaeger-Demo"
	jaegerEndpoint = "127.0.0.1:4317"
)

var tracer = otel.Tracer("grpc-client-example")

// newJaegerTraceProvider 创建一个 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
	// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
	exp, err := otlptracegrpc.New(ctx,
		otlptracegrpc.WithEndpoint(jaegerEndpoint),
		otlptracegrpc.WithInsecure())
	if err != nil {
		return nil, err
	}
	res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
	if err != nil {
		return nil, err
	}
	traceProvider := sdktrace.NewTracerProvider(
		sdktrace.WithResource(res),
		sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样
		sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
	)
	return traceProvider, nil
}

// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
	tp, err := newJaegerTraceProvider(ctx)
	if err != nil {
		return nil, err
	}

	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
	)
	return tp, nil
}

const (
	defaultName = "world"
)

var (
	addr = flag.String("addr", "127.0.0.1:8972", "the address to connect to")
	name = flag.String("name", defaultName, "Name to greet")
)

func main() {
	flag.Parse()

	tp, err := initTracer(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	// 连接到server端,此处禁用安全传输
	conn, err := grpc.NewClient(
		*addr,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 设置 StatsHandler
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer func() { _ = conn.Close() }()

	c := pb.NewGreeterClient(conn)

	// 执行RPC调用并打印收到的响应数据
	md := metadata.Pairs(
		"timestamp", time.Now().Format(time.StampNano),
		"client-id", "hello-client-q1mi",
		"user-id", "7",
	)
	ctx := metadata.NewOutgoingContext(context.Background(), md)
	ctx, span := tracer.Start(ctx, "c.SayHello", trace.WithAttributes(attribute.String("name", "gRPC-client")))
	defer span.End()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetReply())
}

trace 数据: grpc trace

其他常用库的OTel相关内容


扫码关注微信公众号