Skip to content

gRPC — 高性能 RPC 框架

gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,是微服务间通信的首选。

为什么选 gRPC

特性gRPCREST
协议HTTP/2HTTP/1.1
序列化Protobuf(二进制)JSON(文本)
性能极高中等
流式传输原生支持需要 SSE/WebSocket
代码生成自动生成客户端/服务端需要手动或 OpenAPI
类型安全强类型弱类型

安装工具链

bash
# 安装 protoc 编译器
# Windows: choco install protoc
# macOS: brew install protobuf

# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 项目依赖
go get google.golang.org/grpc
go get google.golang.org/protobuf

定义 Proto 文件

protobuf
// proto/user/v1/user.proto
syntax = "proto3";

package user.v1;
option go_package = "github.com/myapp/proto/user/v1;userv1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 用户服务
service UserService {
    // 一元 RPC
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
    rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
    rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);

    // 服务端流式 RPC
    rpc ListUsers(ListUsersRequest) returns (stream User);

    // 客户端流式 RPC
    rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResponse);

    // 双向流式 RPC
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message User {
    int64 id = 1;
    string name = 2;
    string email = 3;
    string role = 4;
    google.protobuf.Timestamp created_at = 5;
}

message GetUserRequest {
    int64 id = 1;
}

message GetUserResponse {
    User user = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
    string password = 3;
}

message CreateUserResponse {
    User user = 1;
}

message UpdateUserRequest {
    int64 id = 1;
    optional string name = 2;
    optional string email = 3;
}

message UpdateUserResponse {
    User user = 1;
}

message DeleteUserRequest {
    int64 id = 1;
}

message ListUsersRequest {
    int32 page = 1;
    int32 page_size = 2;
}

message BatchCreateResponse {
    int32 created_count = 1;
    repeated string errors = 2;
}

message ChatMessage {
    string user_id = 1;
    string content = 2;
    google.protobuf.Timestamp sent_at = 3;
}
bash
# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       proto/user/v1/user.proto

服务端实现

go
package main

import (
    "context"
    "net"
    "log"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "github.com/myapp/proto/user/v1"
)

// 实现生成的接口
type UserServer struct {
    pb.UnimplementedUserServiceServer  // 嵌入,保证前向兼容
    repo UserRepository
}

// 一元 RPC
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    if req.Id <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "无效的用户 ID: %d", req.Id)
    }

    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        if errors.Is(err, ErrNotFound) {
            return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
        }
        return nil, status.Errorf(codes.Internal, "查询失败: %v", err)
    }

    return &pb.GetUserResponse{
        User: toProtoUser(user),
    }, nil
}

// 服务端流式 RPC
func (s *UserServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    users, err := s.repo.List(stream.Context(), int(req.Page), int(req.PageSize))
    if err != nil {
        return status.Errorf(codes.Internal, "查询失败: %v", err)
    }

    for _, user := range users {
        if err := stream.Send(toProtoUser(user)); err != nil {
            return err  // 客户端断开连接
        }
        // 检查 context 是否取消
        select {
        case <-stream.Context().Done():
            return stream.Context().Err()
        default:
        }
    }
    return nil
}

// 双向流式 RPC
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil  // 客户端关闭发送
        }
        if err != nil {
            return err
        }

        // 处理消息并回复
        reply := &pb.ChatMessage{
            UserId:  "server",
            Content: "收到: " + msg.Content,
        }
        if err := stream.Send(reply); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("监听失败: %v", err)
    }

    s := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            LoggingInterceptor,
            RecoveryInterceptor,
            AuthInterceptor,
        ),
        grpc.ChainStreamInterceptor(
            StreamLoggingInterceptor,
        ),
    )

    pb.RegisterUserServiceServer(s, &UserServer{repo: newUserRepo()})

    log.Println("gRPC 服务启动在 :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("服务失败: %v", err)
    }
}

客户端调用

go
func main() {
    // 创建连接
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(10*1024*1024),  // 10MB
        ),
    )
    if err != nil {
        log.Fatalf("连接失败: %v", err)
    }
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)

    // 一元调用
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
    if err != nil {
        // 解析 gRPC 错误
        st, ok := status.FromError(err)
        if ok {
            switch st.Code() {
            case codes.NotFound:
                fmt.Println("用户不存在")
            case codes.DeadlineExceeded:
                fmt.Println("请求超时")
            default:
                fmt.Println("错误:", st.Message())
            }
        }
        return
    }
    fmt.Println("用户:", resp.User.Name)

    // 服务端流
    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{Page: 1, PageSize: 10})
    if err != nil {
        log.Fatal(err)
    }
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println("用户:", user.Name)
    }
}

拦截器(中间件)

go
// 一元拦截器
func LoggingInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()
    resp, err := handler(ctx, req)
    log.Printf("方法: %s, 耗时: %v, 错误: %v",
        info.FullMethod, time.Since(start), err)
    return resp, err
}

// 认证拦截器
func AuthInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    // 从 metadata 获取 token
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "缺少 metadata")
    }

    tokens := md.Get("authorization")
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "缺少 token")
    }

    userID, err := validateToken(tokens[0])
    if err != nil {
        return nil, status.Error(codes.Unauthenticated, "无效 token")
    }

    ctx = context.WithValue(ctx, userIDKey, userID)
    return handler(ctx, req)
}

TLS 安全连接

go
// 服务端
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
s := grpc.NewServer(grpc.Creds(creds))

// 客户端
creds, err := credentials.NewClientTLSFromFile("ca.crt", "")
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))

gRPC 最佳实践

  • 使用 status.Errorf 返回标准错误码,客户端可以精确处理
  • 流式 RPC 要处理 io.EOF 和 context 取消
  • 生产环境必须启用 TLS
  • 使用 buf 工具管理 proto 文件和代码生成

本站内容由 褚成志 整理编写,仅供学习参考