gRPC — 高性能 RPC 框架
gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,是微服务间通信的首选。
为什么选 gRPC
| 特性 | gRPC | REST |
|---|---|---|
| 协议 | HTTP/2 | HTTP/1.1 |
| 序列化 | Protobuf(二进制) | JSON(文本) |
| 性能 | 极高 | 中等 |
| 流式传输 | 原生支持 | 需要 SSE/WebSocket |
| 代码生成 | 自动生成客户端/服务端 | 需要手动或 OpenAPI |
| 类型安全 | 强类型 | 弱类型 |
安装工具链
bash
# 安装 protoc 编译器
# Windows: choco install protoc
# macOS: brew install protobuf
# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 项目依赖
go get google.golang.org/grpc
go get google.golang.org/protobuf定义 Proto 文件
protobuf
// proto/user/v1/user.proto
syntax = "proto3";
package user.v1;
option go_package = "github.com/myapp/proto/user/v1;userv1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// 用户服务
service UserService {
// 一元 RPC
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
// 服务端流式 RPC
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流式 RPC
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResponse);
// 双向流式 RPC
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message User {
int64 id = 1;
string name = 2;
string email = 3;
string role = 4;
google.protobuf.Timestamp created_at = 5;
}
message GetUserRequest {
int64 id = 1;
}
message GetUserResponse {
User user = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
string password = 3;
}
message CreateUserResponse {
User user = 1;
}
message UpdateUserRequest {
int64 id = 1;
optional string name = 2;
optional string email = 3;
}
message UpdateUserResponse {
User user = 1;
}
message DeleteUserRequest {
int64 id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message BatchCreateResponse {
int32 created_count = 1;
repeated string errors = 2;
}
message ChatMessage {
string user_id = 1;
string content = 2;
google.protobuf.Timestamp sent_at = 3;
}bash
# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user/v1/user.proto服务端实现
go
package main
import (
"context"
"net"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "github.com/myapp/proto/user/v1"
)
// 实现生成的接口
type UserServer struct {
pb.UnimplementedUserServiceServer // 嵌入,保证前向兼容
repo UserRepository
}
// 一元 RPC
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
if req.Id <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "无效的用户 ID: %d", req.Id)
}
user, err := s.repo.FindByID(ctx, req.Id)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return nil, status.Errorf(codes.Internal, "查询失败: %v", err)
}
return &pb.GetUserResponse{
User: toProtoUser(user),
}, nil
}
// 服务端流式 RPC
func (s *UserServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users, err := s.repo.List(stream.Context(), int(req.Page), int(req.PageSize))
if err != nil {
return status.Errorf(codes.Internal, "查询失败: %v", err)
}
for _, user := range users {
if err := stream.Send(toProtoUser(user)); err != nil {
return err // 客户端断开连接
}
// 检查 context 是否取消
select {
case <-stream.Context().Done():
return stream.Context().Err()
default:
}
}
return nil
}
// 双向流式 RPC
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil // 客户端关闭发送
}
if err != nil {
return err
}
// 处理消息并回复
reply := &pb.ChatMessage{
UserId: "server",
Content: "收到: " + msg.Content,
}
if err := stream.Send(reply); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
LoggingInterceptor,
RecoveryInterceptor,
AuthInterceptor,
),
grpc.ChainStreamInterceptor(
StreamLoggingInterceptor,
),
)
pb.RegisterUserServiceServer(s, &UserServer{repo: newUserRepo()})
log.Println("gRPC 服务启动在 :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}客户端调用
go
func main() {
// 创建连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024), // 10MB
),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// 一元调用
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
// 解析 gRPC 错误
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
fmt.Println("用户不存在")
case codes.DeadlineExceeded:
fmt.Println("请求超时")
default:
fmt.Println("错误:", st.Message())
}
}
return
}
fmt.Println("用户:", resp.User.Name)
// 服务端流
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{Page: 1, PageSize: 10})
if err != nil {
log.Fatal(err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Println("用户:", user.Name)
}
}拦截器(中间件)
go
// 一元拦截器
func LoggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("方法: %s, 耗时: %v, 错误: %v",
info.FullMethod, time.Since(start), err)
return resp, err
}
// 认证拦截器
func AuthInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 从 metadata 获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少 metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少 token")
}
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效 token")
}
ctx = context.WithValue(ctx, userIDKey, userID)
return handler(ctx, req)
}TLS 安全连接
go
// 服务端
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
s := grpc.NewServer(grpc.Creds(creds))
// 客户端
creds, err := credentials.NewClientTLSFromFile("ca.crt", "")
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))gRPC 最佳实践
- 使用
status.Errorf返回标准错误码,客户端可以精确处理 - 流式 RPC 要处理
io.EOF和 context 取消 - 生产环境必须启用 TLS
- 使用
buf工具管理 proto 文件和代码生成