极客时间 go云原生
链接: https://pan.baidu.com/share/init?surl=9_bcdX5tl25pbzpKX544lQ 提取码: wzhx
什么是微服务 微服务架构风格是一种将单体应用开发为一套小型服务的方法,每个服务都在自己的进程中运行 ,并且使用轻量级的通信机制(HTTP类型的API)进行通信。
这些服务是围绕业务能力构建的,并且可以通过全自动化的部署机制来进行独立部署。
这些服务可以使用不同的编程语言编写,也能使用不同的数据存储技术。
微服务架构带来的挑战
分布式系统的复杂度
服务依赖管理
数据的一致性保证
测试更加艰难
对DevOps等基础设施要求更高
proto文件
编写protobuf文件
生成代码
编写业务逻辑
调用protoc
时,通过传递 go_opt
标志来提供特定于 protocol-gen-go
的标志位参数。可以传递多个go_opt
标志位参数。例如,当执行下面的命令时:
1 protoc --proto_path=src --go_out=out --go_opt=paths=source_relative foo.proto bar/baz.proto
编译器将从 src
目录中读取输入文件 foo.proto
和 bar/baz.proto
,并将输出文件 foo.pb.go
和 bar/baz.pb.go
写入 out
目录。如果需要,编译器会自动创建嵌套的输出子目录,但不会创建输出目录本身。
oneof Wraper gRPC add 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 syntax="proto3" ;package pb;option go_package="server/pb" ;message AddRequest { int64 x=1 ; int64 y=2 ; }message AddReply { int64 res=1 ; }service CallService { rpc Add(AddRequest)returns (AddReply) ; }
1 protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/addClient.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "context" "fmt" "google.golang.org/grpc" "net" "server/pb" )type AddServer struct { *pb.UnimplementedCallServiceServer }func (*AddServer) Add(ctx context.Context, in *pb.AddRequest) (*pb.AddReply, error ) { sum := in.GetX() + in.GetY() return &pb.AddReply{Res: sum}, nil }func main () { l, err := net.Listen("tcp" , "127.0.0.1:8989" ) if err != nil { fmt.Println("Listen failed" , err) return } fmt.Println(l.Addr().String()) s := grpc.NewServer() pb.RegisterCallServiceServer(s, &AddServer{}) if err = s.Serve(l); err != nil { fmt.Println("s.Server failed,err:" , err) return } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "client/pb" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "time" )func main () { conn, err := grpc.Dial("127.0.0.1:8989" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println("grpc.Dial failed" , err) return } defer conn.Close() client := pb.NewCallServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() for i := 0 ; i < 10 ; i++ { resp, err := client.Add(ctx, &pb.AddRequest{ X: int64 (i), Y: -100 , }) if err != nil { fmt.Println("client.Add failed" , err) return } fmt.Printf("i: %d resp-> %d\n" , i, resp.Res) } }
元数据(metadata )是指在处理RPC请求和响应过程中需要但又不属于具体业务(例如身份验证详细信息)的信息,采用键值对列表的形式,其中键是string
类型,值通常是[]string
类型,但也可以是二进制数据。gRPC中的 metadata 类似于我们在 HTTP headers中的键值对,元数据可以包含认证token、请求标识和监控标签等。
metadata中的键是大小写不敏感的 ,由字母、数字和特殊字符-
、_
、.
组成并且不能以grpc-
开头(gRPC保留自用),二进制值的键名必须以-bin
结尾。
元数据对 gRPC 本身是不可见的,我们通常是在应用程序代码或中间件中处理元数据,我们不需要在.proto
文件中指定元数据。
如何访问元数据取决于具体使用的编程语言。 在Go语言中我们是用google.golang.org/grpc/metadata 这个库来操作metadata。
metadata 类型定义如下:
1 type MD map [string ][]string
元数据可以像普通map一样读取。注意,这个 map 的值类型是[]string
,因此用户可以使用一个键附加多个值。
第一种方法是使用函数 New
基于map[string]string
创建元数据:
1 md:=metadata.New(map [string ]string {"k1" :"v1" ,"k2" :"v2" })
另一种方法是使用Pairs
。具有相同键的值将合并到一个列表中:
1 2 3 4 5 md:=metadata.Pairs( "k1" ,"v1" , "k1" ,"v2" , "k2" ,"v3" , )
注意: 所有的键将自动转换为小写
元数据中存储二进制数据 在元数据中,键始终是字符串。但是值可以是字符串或二进制数据。要在元数据中存储二进制数据值,只需在密钥中添加“-bin”后缀。在创建元数据时,将对带有“-bin”后缀键的值进行编码:
1 2 3 4 5 md := metadata.Pairs( "key" , "string value" , "key-bin" , string ([]byte {96 , 102 }), )
从上下文获取元数据 1 2 3 4 func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) { md, ok := metadata.FromIncomingContext(ctx) }
客户端 有两种方法可以将元数据发送到服务端。推荐的方法是使用 AppendToOutgoingContext
将 kv 对附加到context。无论context中是否已经有元数据都可以使用这个方法。如果先前没有元数据,则添加元数据; 如果context中已经存在元数据,则将 kv 对合并进去。
1 2 3 4 5 6 7 8 9 10 11 ctx := metadata.AppendToOutgoingContext(ctx, "k1" , "v1" , "k1" , "v2" , "k2" , "v3" ) ctx := metadata.AppendToOutgoingContext(ctx, "k3" , "v4" ) response, err := client.SomeRPC(ctx, someRequest) stream, err := client.SomeStreamingRPC(ctx)
或者,可以使用 NewOutgoingContext
将元数据附加到context。但是,这将替换context中的任何已有的元数据,因此必须注意保留现有元数据(如果需要的话)。这个方法比使用 AppendToOutgoingContext
要慢。这方面的一个例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 md := metadata.Pairs("k1" , "v1" , "k1" , "v2" , "k2" , "v3" ) ctx := metadata.NewOutgoingContext(context.Background(), md) send, _ := metadata.FromOutgoingContext(ctx) newMD := metadata.Pairs("k3" , "v3" ) ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD)) response, err := client.SomeRPC(ctx, someRequest) stream, err := client.SomeStreamingRPC(ctx)
客户端可以接收的元数据包括header和trailer。
trailer可以用于服务器希望在处理请求后给客户端发送任何内容,例如在流式RPC中只有等所有结果都流到客户端后才能计算出负载信息,这时候就不能使用headers(header在数据之前,trailer在数据之后)。
引申:HTTP trailer
普通调用
可以使用 CallOption 中的 Header 和 Trailer 函数来获取普通RPC调用发送的header和trailer:
1 2 3 4 5 6 7 8 9 var header, trailer metadata.MD r, err := client.SomeRPC( ctx, someRequest, grpc.Header(&header), grpc.Trailer(&trailer), )
流式调用
使用接口 ClientStream 中的 Header
和 Trailer
函数,可以从返回的流中接收 Header 和 Trailer:
1 2 3 4 5 6 7 stream, err := client.SomeStreamingRPC(ctx) header, err := stream.Header() trailer := stream.Trailer()
服务端 普通调用
在普通调用中,服务器可以调用 grpc 模块中的 SendHeader 和 SetTrailer 函数向客户端发送header和trailer。这两个函数将context作为第一个参数。它应该是 RPC 处理程序的上下文或从中派生的上下文:
1 2 3 4 5 6 7 8 func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error ) { header := metadata.Pairs("header-key" , "val" ) grpc.SendHeader(ctx, header) trailer := metadata.Pairs("trailer-key" , "val" ) grpc.SetTrailer(ctx, trailer) }
流式调用
对于流式调用,可以使用接口 ServerStream 中的 SendHeader
和 SetTrailer
函数发送header和trailer:
1 2 3 4 5 6 7 func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error { header := metadata.Pairs("header-key" , "val" ) stream.SendHeader(header) trailer := metadata.Pairs("trailer-key" , "val" ) stream.SetTrailer(trailer)
要读取客户端发送的元数据,服务器需要从 RPC 上下文检索它。如果是普通RPC调用,则可以使用 RPC 处理程序的上下文。对于流调用,服务器需要从流中获取上下文。
普通调用
1 2 3 4 func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error ) { md, ok := metadata.FromIncomingContext(ctx) }
流式调用
1 2 3 4 func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error { md, ok := metadata.FromIncomingContext(stream.Context()) }
gRPC错误处理 似于HTTP定义了一套响应状态码,gRPC也定义有一些状态码。Go语言中此状态码由codes 定义,本质上是一个uint32。
使用时需导入google.golang.org/grpc/codes
包。
1 import "google.golang.org/grpc/codes"
目前已经定义的状态码有如下几种。
Code
值
含义
OK
0
请求成功
Canceled
1
操作已取消
Unknown
2
未知错误。如果从另一个地址空间接收到的状态值属 于在该地址空间中未知的错误空间,则可以返回此错误的示例。 没有返回足够的错误信息的API引发的错误也可能会转换为此错误
InvalidArgument
3
表示客户端指定的参数无效。 请注意,这与 FailedPrecondition 不同。 它表示无论系统状态如何都有问题的参数(例如,格式错误的文件名)。
DeadlineExceeded
4
表示操作在完成之前已过期。对于改变系统状态的操作,即使操作成功完成,也可能会返回此错误。 例如,来自服务器的成功响应可能已延迟足够长的时间以使截止日期到期。
NotFound
5
表示未找到某些请求的实体(例如,文件或目录)。
AlreadyExists
6
创建实体的尝试失败,因为实体已经存在。
PermissionDenied
7
表示调用者没有权限执行指定的操作。 它不能用于拒绝由耗尽某些资源引起的(使用 ResourceExhausted )。 如果无法识别调用者,也不能使用它(使用 Unauthenticated )。
ResourceExhausted
8
表示某些资源已耗尽,可能是每个用户的配额,或者整个文件系统空间不足
FailedPrecondition
9
指示操作被拒绝,因为系统未处于操作执行所需的状态。 例如,要删除的目录可能是非空的,rmdir 操作应用于非目录等。
Aborted
10
表示操作被中止,通常是由于并发问题,如排序器检查失败、事务中止等。
OutOfRange
11
表示尝试超出有效范围的操作。
Unimplemented
12
表示此服务中未实施或不支持/启用操作。
Internal
13
意味着底层系统预期的一些不变量已被破坏。 如果你看到这个错误,则说明问题很严重。
Unavailable
14
表示服务当前不可用。这很可能是暂时的情况,可以通过回退重试来纠正。 请注意,重试非幂等操作并不总是安全的。
DataLoss
15
表示不可恢复的数据丢失或损坏
Unauthenticated
16
表示请求没有用于操作的有效身份验证凭据
_maxCode
17
-
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package mainimport ( "client/pb" "context" "fmt" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "log" "time" )func main () { conn, err := grpc.Dial("127.0.0.1:8972" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("grpc.Dial failed,err:%v" , err) return } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: "YST" }) if err != nil { s := status.Convert(err) for _, d := range s.Details() { switch info := d.(type ) { case *errdetails.QuotaFailure: fmt.Printf("QuotaFailure:%s\n" , info) default : fmt.Printf("unexpected type:%v\n" , info) } } fmt.Printf("c.SayHello failed, err:%v\n" , err) return } fmt.Println("resp->" , resp.Reply) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package mainimport ( "context" "fmt" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "net" "server/pb" "sync" )type server struct { pb.UnimplementedGreeterServer mu sync.Mutex count map [string ]int }func (s server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error ) { s.mu.Lock() defer s.mu.Unlock() s.count[in.Name]++ if s.count[in.GetName()] > 1 { st := status.New(codes.ResourceExhausted, "reqest limited" ) ds, err := st.WithDetails( &errdetails.QuotaFailure{ Violations: []*errdetails.QuotaFailure_Violation{ { Subject: fmt.Sprintf("name:%s" , in.GetName()), Description: "每个name只能调用一次SayHello" , }, }, }, ) if err != nil { return nil , st.Err() } return nil , ds.Err() } reply := "hello " + in.GetName() return &pb.HelloResponse{Reply: reply}, nil }func main () { l, err := net.Listen("tcp" , "127.0.0.1:8972" ) if err != nil { fmt.Println("net.Listen failed" , err) return } fmt.Printf("begin to listen on addr %v\n" , l.Addr().String()) s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{count: make (map [string ]int )}) if err = s.Serve(l); err != nil { fmt.Printf("s.server failed,%v" , err) return } }
加密或认证 使用服务器身份验证 SSL/TLS gRPC 内置支持 SSL/TLS,可以通过 SSL/TLS 证书建立安全连接,对传输的数据进行加密处理。
这里我们演示如何使用自签名证书进行server端加密。
生成证书 生成私钥 执行下面的命令生成私钥文件——server.key
。
1 openssl ecparam -genkey -name secp384r1 -out server.key
1 2 3 4 5 6 7 8 9 -----BEGIN EC PARAMETERS----- BgUrgQQAIg== -----END EC PARAMETERS----- -----BEGIN EC PRIVATE KEY----- MIGkAgEBBDD7h65GkT5/BTGxG8ZSPzxAyZQcQZeGYs+dcqYbxYVMh01cLG7Q9AfM VMzdPWQSSVygBwYFK4EEACKhZANiAARXP8OxhpgV7JxFoCxY4byJ926gS6xXRX/d EC4oGmbcvN46tsXI9CThGtLlzXbI73ICgqGy1iNiCf+2KHngyH//2VF06h5Zr1XX vjAVBsBzjR648aCGFmG7j+j8TIPc5xU= -----END EC PRIVATE KEY-----
这段文本包含了一个椭圆曲线(Elliptic Curve)私钥的信息,同样以 “—–BEGIN EC PARAMETERS—–” 开头,以 “—–END EC PARAMETERS—–” 结尾。它还包含了相应的私钥,以 “—–BEGIN EC PRIVATE KEY—–” 开头,以 “—–END EC PRIVATE KEY—–” 结尾。这种格式同样是为了在文本之间传输方便而进行的 Base64 编码。
让我们对这两个部分进行解释:
椭圆曲线参数(EC PARAMETERS):
在这个部分,包含了椭圆曲线的相关参数。具体的参数内容在这里是 Base64 编码的,如果需要详细了解,可能需要解码这部分内容。
椭圆曲线私钥(EC PRIVATE KEY):
这一部分包含了使用椭圆曲线加密算法生成的私钥。
Base64 编码的私钥内容。
这段文本表示一个使用椭圆曲线加密算法的密钥对。椭圆曲线密码学在某些情况下相对于传统的 RSA 密码学更高效,因为它提供相同的安全性,但需要更短的密钥长度。这对于资源受限的环境,比如嵌入式设备或移动设备,是非常有利的。
这里生成的是ECC私钥,当然你也可以使用RSA。
生成自签名的证书 为了在证书中添加SANs信息,我们将下面自定义配置保存到server.cnf
文件中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 [ req ] default_bits = 4096 default_md = sha256 distinguished_name = req_distinguished_name req_extensions = req_ext [ req_distinguished_name ] countryName = Country Name (2 letter code) countryName_default = CN stateOrProvinceName = State or Province Name (full name) stateOrProvinceName_default = BEIJING localityName = Locality Name (eg, city) localityName_default = BEIJING organizationName = Organization Name (eg, company) organizationName_default = DEV commonName = Common Name (e.g. server FQDN or YOUR name) commonName_max = 64 commonName_default = liwenzhou.com [ req_ext ] subjectAltName = @alt_names [alt_names] DNS.1 = localhost DNS.2 = liwenzhou.com IP = 127.0.0.1
执行下面的命令生成自签名证书——server.crt
。
1 openssl req -nodes -new -x509 -sha256 -days 3650 -config server.cnf -extensions 'req_ext' -key server.key -out server.crt
建立安全连接 Server端使用credentials.NewServerTLSFromFile
函数分别加载证书server.cert
和秘钥server.key
。
1 2 3 4 5 creds, _ := credentials.NewServerTLSFromFile(certFile, keyFile) s := grpc.NewServer(grpc.Creds(creds)) lis, _ := net.Listen("tcp" , "127.0.0.1:8972" ) s.Serve(lis)
而client端使用上一步生成的证书文件——server.cert
建立安全连接。
1 2 3 4 5 creds, _ := credentials.NewClientTLSFromFile(certFile, "" ) conn, _ := grpc.Dial("127.0.0.1:8972" , grpc.WithTransportCredentials(creds)) client := pb.NewGreeterClient(conn)
除了这种自签名证书的方式外,生产环境对外通信时通常需要使用受信任的CA证书。
拦截器 服务注册与服务发现 服务及其调用方直接与注册中心交互
通过部署基础设施来处理服务发现
主流注册中心对比
上图所说的CP是值CAP理论中的CP
CAP理论
一致性(Consistency):所有节点在同一个时间具有相同的数据
可用性(Availability):保证每个请求不管成功或者失败都有相应
分区容忍性(Partotion tolerance):系统中任意的信息丢失或者是失败都不会影响系统的继续运转
consul服务注册
注册consul节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package mainimport ( "context" "fmt" "github.com/hashicorp/consul/api" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" "hello_server/pb" "log" "net" )const serviceName = "hello_server" type server struct { pb.UnimplementedGreeterServer }func (server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error ) { reply := "hello," + in.Name return &pb.HelloResponse{Reply: reply}, nil }func GetOutboundIP () (net.IP, error ) { conn, err := net.Dial("udp" , "8.8.8.8:80" ) if err != nil { return nil , err } defer conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) return localAddr.IP, nil }func main () { l, err := net.Listen("tcp" , ":8977" ) if err != nil { log.Fatal("failed to listen,err" , err) return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) healthcheck := health.NewServer() healthpb.RegisterHealthServer(s, healthcheck) cc, err := api.NewClient(api.DefaultConfig()) if err != nil { fmt.Println("api.NewClient failed" , err) return } IP, err := GetOutboundIP() if err != nil { fmt.Println("GetOutboundIP failed" , err) return } fmt.Println(IP.String()) check := &api.AgentServiceCheck{ GRPC: fmt.Sprintf("%s:%d" , IP.String(), 8977 ), Timeout: "10s" , Interval: "10s" , DeregisterCriticalServiceAfter: "1m" , } srv := &api.AgentServiceRegistration{ ID: fmt.Sprintf("%s-%s-%d" , serviceName, IP.String(), 8977 ), Name: serviceName, Tags: []string {"sayhello" , "Forrest" }, Address: IP.String(), Port: 8977 , Check: check, } err = cc.Agent().ServiceRegister(srv) if err != nil { fmt.Println(" cc.Agent().ServiceRegister(srv) failed" , err) return } err = s.Serve(l) if err != nil { log.Fatal("failed to Serve,err" , err) return } }
服务发现
gRPC支持自定义resolver,借助第三方的grpc-consul-resolver 库,我们可以更便捷的实现基于consul的服务发现。
1 import _ "github.com/mbobakov/grpc-consul-resolver"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "context" "flag" "fmt" "github.com/Forrest/backend/yst/hello_client/pb" _ "github.com/mbobakov/grpc-consul-resolver" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log" "time" )var name = flag.String("name" , "yst" , "通过 -name指定你的名字" )func main () { flag.Parse() conn, err := grpc.Dial( "consul://localhost:8500/hello_server" , grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Fatal("grpc.Dail failed,err:" , err) return } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 1 *time.Second) defer cancel() resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name}) if err != nil { log.Printf("c.SayHrllo failed,err:%v" , err) } fmt.Println("resp:" , resp.GetReply()) }
在grpc.Dial
时直接使用类似 consul://[user:password@]127.0.0.127:8555/my-service?[healthy=]&[wait=]&[near=]&[insecure=]&[limit=]&[tag=]&[token=]
的连接字符串来指定连接目标。
目前支持的参数:
Name
格式
介绍
tag
string
根据标签筛选
healthy
true/false
只返回通过所有健康检查的端点。默认值:false
wait
time.ParseDuration
监控变更的等待时间。在这个时间段内,端点将强制刷新。默认值:继承agent的配置
insecure
true/false
允许与consul进行不安全的通信。默认值:true
near
string
按响应持续时间对端点排序。可与“limit”参数有效结合。默认值:”_agent”
limit
int
限制服务的端点数。默认值:无限制
timeout
time.ParseDuration
Http-client超时。默认值:60s
max-backoff
time.ParseDuration
重新连接到consul的最大后退时间。重连从10ms开始,成倍增长,直到到max-backoff。默认值:1s
token
string
Consul token
dc
string
consul数据中心。可选
allow-stale
true/false
允许agent返回过期读的结果 https://developer.hashicorp.com/consul/api-docs/features/consistency#stale
require-consistent
true/false
强制读取完全一致。这比较昂贵,但可以防止执行过期读操作
kratos 框架 新建一个项目
kratos中使用 proto文件 来完成通信 ,新建一个proto文件
1 kratos proto add api/bubble/v1/todo.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 syntax = "proto3" ;package api.bubble.v1;import "google/api/annotations.proto" ;option go_package = "bubble/api/bubble/v1;v1" ;option java_multiple_files = true ;option java_package = "api.bubble.v1" ;service Todo { rpc CreateTodo (CreateTodoRequest) returns (CreateTodoReply) { option (google.api.http) = { post: "/v1/todo" , body: "*" , }; } rpc UpdateTodo (UpdateTodoRequest) returns (UpdateTodoReply) { option (google.api.http) = { put: "/v1/todo/{id}" , body: "*" , }; } rpc DeleteTodo (DeleteTodoRequest) returns (DeleteTodoReply) { option (google.api.http) = { delete: "/v1/todo/{id}" , }; } rpc GetTodo (GetTodoRequest) returns (GetTodoReply) { option (google.api.http) = { get: "/v1/todo/{id}" , }; } rpc ListTodo (ListTodoRequest) returns (ListTodoReply) { option (google.api.http) = { get: "/v1/todos" , }; } }message todo { int64 id = 1 ; string title = 2 ; bool status = 3 ; }message CreateTodoRequest { string title = 1 ; }message CreateTodoReply { todo todo = 1 ; }message UpdateTodoRequest { int64 id = 1 ; string title = 2 ; bool status = 3 ; }message UpdateTodoReply { }message DeleteTodoRequest { int64 id = 1 ; }message DeleteTodoReply {}message GetTodoRequest { int64 id = 1 ; }message GetTodoReply { todo todo = 1 ; }message ListTodoRequest {}message ListTodoReply { repeated todo data = 1 ; }
生成proto代码
1 kratos proto client api/bubble/v1/todo.proto
通过proto文件,可以直接生成对应的Service实现代码;使用kratos命令并且通过 -t
指定生成代码的保存目录
1 2 3 kratos proto server api/bubble/v1/todo.proto -t internal/service kratos proto client api/review/v1/review_error.proto
server 端完整实例
服务注册 健康检查 接受退出服务信号服务注销
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 package mainimport ( "context" "fmt" "github.com/hashicorp/consul/api" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" "hello_server/pb" "log" "net" "os" "os/signal" "syscall" )const serviceName = "hello_server" type server struct { pb.UnimplementedGreeterServer }func (server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error ) { reply := "hello," + in.Name return &pb.HelloResponse{Reply: reply}, nil }type consul struct { client *api.Client }func NewConsul (addr string ) (*consul, error ) { cfg := api.DefaultConfig() client, err := api.NewClient(cfg) if err != nil { return nil , err } return &consul{client: client}, nil }func (cc *consul) RegisterService(serviceName string , IP string , port int ) error { check := &api.AgentServiceCheck{ GRPC: fmt.Sprintf("%s:%d" , IP, 8972 ), Timeout: "10s" , Interval: "10s" , DeregisterCriticalServiceAfter: "1m" , } srv := &api.AgentServiceRegistration{ ID: fmt.Sprintf("%s-%s-%d" , serviceName, IP, port), Name: serviceName, Tags: []string {"important" , "Forrest" }, Address: IP, Port: port, Check: check, } return cc.client.Agent().ServiceRegister(srv) }func GetOutboundIP () (net.IP, error ) { conn, err := net.Dial("udp" , "8.8.8.8:80" ) if err != nil { return nil , err } defer conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) return localAddr.IP, nil }func (c *consul) Deregister(serviceID string ) error { return c.client.Agent().ServiceDeregister(serviceID) }func main () { port := 8972 lis, err := net.Listen("tcp" , fmt.Sprintf(":%d" , port)) if err != nil { fmt.Printf("failed to listen: %v" , err) return } s := grpc.NewServer() healthcheck := health.NewServer() healthpb.RegisterHealthServer(s, healthcheck) pb.RegisterGreeterServer(s, &server{}) consul, err := NewConsul("127.0.0.1:8500" ) if err != nil { log.Fatalf("NewConsul failed, err:%v\n" , err) } ipObj, _ := GetOutboundIP() ip := ipObj.String() serviceId := fmt.Sprintf("%s-%s-%d" , "hello" , ip, port) fmt.Println(serviceId) consul.RegisterService(serviceId, ip, port) go func () { err = s.Serve(lis) if err != nil { log.Printf("failed to serve: %v" , err) return } }() quit := make (chan os.Signal, 1 ) signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT) <-quit consul.Deregister(serviceId) }
依赖注入工具 wire google开源,编译期间完成依赖注入。
provider(提供者)
可导出的
injector(注入器)
应用程序中使用一个注入器来提供 提供者,注入器就是一个按照依赖顺序调用提供者
使用 wire
时,只需要编写注入器的函数签名,然后 wire
会生成对应的 函数体
项目实战
系统分析 输入输出
功能模块
C端:用户端,包括发表评论和查看评论的用户
B端:商家端,店铺商家端,店铺管理者,商品发布者
O端:运营端,运营的同学在后台负责审核用户评论,处理商家申诉,以及评论进行运营活动
按照数据流动的角度去分析 电商
读多写少
UGC评论系统
读多写多
管理项目于代码的方式
项目中如何管理pb文件
通常公司中是把 proto文件和生成的不同语言的代码都放在一个独立的公用代码库
别的项目直接引用这个公用的代码
1 git submodule add git@github.com:Q1mi/reviewapis.git ./api
更新submodule
用来初始化本地 配置文件
添加到安全组
1 git config --global --add safe.directory D:/Go_WorkSpace/review-b
代码实现 基本框架
1 kratos proto add api/review/v1/review.proto
书写proto文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 syntax = "proto3" ;package api.review.v1;import "google/api/annotations.proto" ;import "validate/validate.proto" ;option go_package = "review/api/review/v1;v1" ;option java_multiple_files = true ;option java_package = "api.review.v1" ;service Review { rpc CreateReview (CreateReviewRequest) returns (CreateReviewReply) { option (google.api.http) = { post: "/v1/review" , body: "*" }; }; rpc UpdateReview (UpdateReviewRequest) returns (UpdateReviewReply) ; rpc DeleteReview (DeleteReviewRequest) returns (DeleteReviewReply) ; rpc GetReview (GetReviewRequest) returns (GetReviewReply) ; rpc ListReview (ListReviewRequest) returns (ListReviewReply) ; }message CreateReviewRequest { int64 userID = 1 [(validate.rules).int64 = {gt: 0 }]; int64 orderID = 2 [(validate.rules).int64 = {gt: 0 }]; int32 score = 3 [(validate.rules).int32 = {in: [1 ,2 ,3 ,4 ,5 ]}]; int32 serviceScore = 4 [(validate.rules).int32 = {in: [1 ,2 ,3 ,4 ,5 ]}]; int32 expressScore = 5 [(validate.rules).int32 = {in: [1 ,2 ,3 ,4 ,5 ]}]; string content = 6 [(validate.rules).string = {min_len: 8 , max_len: 255 }]; string picInfo = 7 ; string videoInfo = 8 ; bool anonymous = 9 ; }message CreateReviewReply { int64 reviewID = 1 ; }message UpdateReviewRequest {}message UpdateReviewReply {}message DeleteReviewRequest {}message DeleteReviewReply {}message GetReviewRequest {}message GetReviewReply {}message ListReviewRequest {}message ListReviewReply {}
生成client代码
1 kratos proto client api/review/v1/review.proto
1 kratos proto server api/review/v1/review.proto -t internal/service
server- > service -> biz -> data
wire
gen data层的配置
服务注册
配置文件
server 层
写 Registry的构造函数
添加到 provider中
// ProviderSet is server providers.
var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewRegistrar)
func NewRegistrar(conf *conf.Registry) registry.Registrar {
c := consulAPI.DefaultConfig()
c.Address = conf.Consul.Address
c.Scheme = conf.Consul.Scheme
cli, err := consulAPI.NewClient(c)
if err != nil {
panic(err)
}
r := consul.New(cli, consul.WithHealthCheck(false))
return r
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - cmd - Name Version - ```go var ( // Name is the name of the compiled software. Name = "review.service" // Version is the version of the compiled software. Version string = "v0.1" // flagconf is the config flag. flagconf string id, _ = os.Hostname() )
newApp 参数填充
func newApp(logger log.Logger, rr registry.Registrar, gs *grpc.Server, hs *http.Server) *kratos.App {
return kratos.New(
kratos.ID(id),
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.Server(
gs,
hs,
),
kratos.Registrar(rr),
)
}
1 2 3 4 5 6 7 8 9 10 - 解析 pb文件结构体 - ```go var rc conf .Registry if err := c.Scan(&rc); err != nil { panic(err ) } app , cleanup, err := wireApp(bc.Server, &rc, bc.Data, logger)
填充 wireApp
// wireApp init kratos application.
func wireApp(*conf.Server, *conf.Registry, *conf.Data, log.Logger) (*kratos.App, func(), error) {
panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, newApp))
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 #### 项目sql ```sqlCREATE TABLE review_info ( `id` bigint(32 ) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键' , `create_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '创建方标识' , `update_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '更新方标识' , `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' , `delete_at` timestamp COMMENT '逻辑删除标记' , `version` int(10 ) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记' , `review_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '评价id' , `content` varchar(512 ) NOT NULL COMMENT '评价内容' , `score` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '评分' , `service_score` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '商家服务评分' , `express_score` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '物流评分' , `has_media` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '是否有图或视频' , `order_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '订单id' , `sku_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT 'sku id' , `spu_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT 'spu id' , `store_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '店铺id' , `user_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '用户id' , `anonymous` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '是否匿名' , `tags` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '标签json' , `pic_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:图片' , `video_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:视频' , `status` tinyint(4 ) NOT NULL DEFAULT '10' COMMENT '状态:10待审核;20审核通过;30审核不通过;40隐藏' , `is_default` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '是否默认评价' , `has_reply` tinyint(4 ) NOT NULL DEFAULT '0' COMMENT '是否有商家回复:0无;1有' , `op_reason` varchar(512 ) NOT NULL DEFAULT '' COMMENT '运营审核拒绝原因' , `op_remarks` varchar(512 ) NOT NULL DEFAULT '' COMMENT '运营备注' , `op_user` varchar(64 ) NOT NULL DEFAULT '' COMMENT '运营者标识' , `goods_snapshoot` varchar(2048 ) NOT NULL DEFAULT '' COMMENT '商品快照信息' , `ext_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '信息扩展' , `ctrl_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '控制扩展' , PRIMARY KEY (`id` ), KEY `idx_delete_at` (`delete_at` ) COMMENT '逻辑删除索引' , UNIQUE KEY `uk_review_id` (`review_id` ) COMMENT '评价id索引' , KEY `idx_order_id` (`order_id` ) COMMENT '订单id索引' , KEY `idx_user_id` (`user_id` ) COMMENT '用户id索引' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价表' ;CREATE TABLE review_reply_info ( `id` bigint(32 ) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键' , `create_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '创建方标识' , `update_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '更新方标识' , `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' , `delete_at` timestamp COMMENT '逻辑删除标记' , `version` int(10 ) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记' , `reply_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '回复id' , `review_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '评价id' , `store_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '店铺id' , `content` varchar(512 ) NOT NULL COMMENT '评价内容' , `pic_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:图片' , `video_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:视频' , `ext_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '信息扩展' , `ctrl_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '控制扩展' , PRIMARY KEY (`id` ), KEY `idx_delete_at` (`delete_at` ) COMMENT '逻辑删除索引' , UNIQUE KEY `uk_reply_id` (`reply_id` ) COMMENT '回复id索引' , KEY `idx_review_id` (`review_id` ) COMMENT '评价id索引' , KEY `idx_store_id` (`store_id` ) COMMENT '店铺id索引' )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价商家回复表' ;CREATE TABLE review_appeal_info ( `id` bigint(32 ) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键' , `create_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '创建方标识' , `update_by` varchar(48 ) NOT NULL DEFAULT '' COMMENT '更新方标识' , `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' , `delete_at` timestamp COMMENT '逻辑删除标记' , `version` int(10 ) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记' , `appeal_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '回复id' , `review_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '评价id' , `store_id` bigint(32 ) NOT NULL DEFAULT '0' COMMENT '店铺id' , `status` tinyint(4 ) NOT NULL DEFAULT '10' COMMENT '状态:10待审核;20申诉通过;30申诉驳回' , `reason` varchar(255 ) NOT NULL COMMENT '申诉原因类别' , `content` varchar(255 ) NOT NULL COMMENT '申诉内容描述' , `pic_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:图片' , `video_info` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '媒体信息:视频' , `op_remarks` varchar(512 ) NOT NULL DEFAULT '' COMMENT '运营备注' , `op_user` varchar(64 ) NOT NULL DEFAULT '' COMMENT '运营者标识' , `ext_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '信息扩展' , `ctrl_json` varchar(1024 ) NOT NULL DEFAULT '' COMMENT '控制扩展' , PRIMARY KEY (`id` ), KEY `idx_delete_at` (`delete_at` ) COMMENT '逻辑删除索引' , KEY `idx_appeal_id` (`appeal_id` ) COMMENT '申诉id索引' , UNIQUE KEY `uk_review_id` (`review_id` ) COMMENT '评价id索引' , KEY `idx_store_id` (`store_id` ) COMMENT '店铺id索引' )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价商家申诉表' ;
表结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 +------------+---------------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +------------+---------------------+------+-----+---------------------+-----------------------------+ | id | bigint(32) unsigned | NO | PRI | NULL | auto_increment | | create_by | varchar(48) | NO | | | | | update_by | varchar(48) | NO | | | | | create_at | timestamp | NO | | CURRENT_TIMESTAMP | | | update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | | delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | | | version | int(10) unsigned | NO | | 0 | | | appeal_id | bigint(32) | NO | MUL | 0 | | | review_id | bigint(32) | NO | UNI | 0 | | | store_id | bigint(32) | NO | MUL | 0 | | | status | tinyint(4) | NO | | 10 | | | reason | varchar(255) | NO | | NULL | | | content | varchar(255) | NO | | NULL | | | pic_info | varchar(1024) | NO | | | | | video_info | varchar(1024) | NO | | | | | op_remarks | varchar(512) | NO | | | | | op_user | varchar(64) | NO | | | | | ext_json | varchar(1024) | NO | | | | | ctrl_json | varchar(1024) | NO | | | | +------------+---------------------+------+-----+---------------------+-----------------------------+
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 +------------+---------------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +------------+---------------------+------+-----+---------------------+-----------------------------+ | id | bigint(32) unsigned | NO | PRI | NULL | auto_increment | | create_by | varchar(48) | NO | | | | | update_by | varchar(48) | NO | | | | | create_at | timestamp | NO | | CURRENT_TIMESTAMP | | | update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | | delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | | | version | int(10) unsigned | NO | | 0 | | | reply_id | bigint(32) | NO | UNI | 0 | | | review_id | bigint(32) | NO | MUL | 0 | | | store_id | bigint(32) | NO | MUL | 0 | | | content | varchar(512) | NO | | NULL | | | pic_info | varchar(1024) | NO | | | | | video_info | varchar(1024) | NO | | | | | ext_json | varchar(1024) | NO | | | | | ctrl_json | varchar(1024) | NO | | | | +------------+---------------------+------+-----+---------------------+-----------------------------+
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 +-----------------+---------------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+---------------------+------+-----+---------------------+-----------------------------+ | id | bigint(32) unsigned | NO | PRI | NULL | auto_increment | | create_by | varchar(48) | NO | | | | | update_by | varchar(48) | NO | | | | | create_at | timestamp | NO | | CURRENT_TIMESTAMP | | | update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | | delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | | | version | int(10) unsigned | NO | | 0 | | | review_id | bigint(32) | NO | UNI | 0 | | | content | varchar(512) | NO | | NULL | | | score | tinyint(4) | NO | | 0 | | | service_score | tinyint(4) | NO | | 0 | | | express_score | tinyint(4) | NO | | 0 | | | has_media | tinyint(4) | NO | | 0 | | | order_id | bigint(32) | NO | MUL | 0 | | | sku_id | bigint(32) | NO | | 0 | | | spu_id | bigint(32) | NO | | 0 | | | store_id | bigint(32) | NO | | 0 | | | user_id | bigint(32) | NO | MUL | 0 | | | anonymous | tinyint(4) | NO | | 0 | | | tags | varchar(1024) | NO | | | | | pic_info | varchar(1024) | NO | | | | | video_info | varchar(1024) | NO | | | | | status | tinyint(4) | NO | | 10 | | | is_default | tinyint(4) | NO | | 0 | | | has_reply | tinyint(4) | NO | | 0 | | | op_reason | varchar(512) | NO | | | | | op_remarks | varchar(512) | NO | | | | | op_user | varchar(64) | NO | | | | | goods_snapshoot | varchar(2048) | NO | | | | | ext_json | varchar(1024) | NO | | | | | ctrl_json | varchar(1024) | NO | | | | +-----------------+---------------------+------+-----+---------------------+-----------------------------+
待续。。。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mainimport ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gen" )const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/review_service?charset=utf8mb4&parseTime=True" func connectDB (dsn string ) *gorm.DB { db, err := gorm.Open(mysql.Open(dsn)) if err != nil { panic (fmt.Errorf("connect db fail: %w" , err)) } return db }func main () { g := gen.NewGenerator(gen.Config{ OutPath: "../../internal/data/query" , Mode: gen.WithDefaultQuery | gen.WithQueryInterface, }) g.UseDB(connectDB(MySQLDSN)) g.ApplyBasic(g.GenerateAllTable()...) g.Execute() }
GEN的使用 错误处理 snowflake算法 pb 的validate 参数校验 | Kratos (go-kratos.dev)
根据validate的规则给probuf文件写 validate
make validate,生成validate的pb文件
在 grpc
http
中插入 validate.Validator(),
copier库 分页处理 1 2 limit = size offset = (page - 1 ) * size
C端
用户创建评价
用户查询根据userID查询自己的评价
用户根据OrderID查询评价
查询评价回复
B端
回复评价
字段 reviewID、storeID、content、picInfo、vidInfo
查看评价是否存在
水平鉴权
保存到数据库中
申诉评价
O端
AuditReview
AuditAppeal
更新 appeal表的状态
如果申诉成功,则更改info表的状态,显示为 隐藏
review-job/ review-task 后端服务的分类
service :微服务,提供API RPC接口
job:流式服务,处理实时流
检查数据的变更,将多张表的数据整合 加工 处理 组成成一个 document,存入ES中
task 定时任务,定时处理任务
start
stop
go-kafaka canal 配置操作
查询 操作
通过singleFlight合并大量相同的并发查询
Canal mysql的增量数据解析工具 **canal [kə’næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
待续。。。。
1 show variables like 'log_bin';
CQRS架构和Elastic Search 为什么要使用 ES来 实现O端的数据查询?
O端需要 多条件的 复制查询(多字段 跨表 多数据 ->超时)
CQRS架构是什么
是 命令查询职责分离 (command QueryResponsibility Segregation)的缩写,在基于CQRS 的系统中,命令(写操作)和查询操作(读操作)所使用的数据模型是由区别的。通过各种机制将命令模型中的变更传播到查询模型中,让两个模型保持一致。
ElasticSearch ElasticSearch架构主要由三个组件构成:索引、分片和节点
索引是文档的逻辑分组,类似于数据库的表
分片是索引的物理分区,用于提高数据库的分布和性能提升
节点是运行ElasticSearch的节点
工作原理
接受用户的查询请求:ElasticSearch通过RESTful API 或者是 JSON请求接受用户到查询请求
路由请求:接收到查询请求后,ElasticSearch根据请求中的索引和分区的信息,将请求路由到相应的节点
执行查询:节点执行查询请求,并在相应的索引中 查找匹配的文档
返回结果:结果以json的格式返回给用户,包括匹配的文档和相应字段信息
我的踩坑记录
mysql的DSN没加charset=utf8mb4&parseTime=True
报错gorm sql: Scan error on column index 3, name "create_at": unsupported Scan, storing driver.Value type []uint8 into type *time.Time
makefile的插件 对空格的识别问题
记得在httpSrv和grpc中添加 validate.Validator(),
server 层进行格式转换(proto的数据转为GEN的数据格式)
空指针异常 这行代码只是分配了内存空间,并没有为每个元素分配内存。因此,reviewInfos
中的每个元素都是 nil
,即空指针。
方法一
1 2 3 4 5 6 reviewInfos := make ([]*ReviewInfo, len (reviews)) for i := range reviews { reviewInfos[i] = &ReviewInfo{} }
方法二
limit 因该在 offset前
创建基本表
1 2 3 4 5 6 7 8 9 CREATE TABLE book ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `title` varchar(128) NOT NULL COMMENT '书籍名称', `author` varchar(128) NOT NULL COMMENT '作者', `price` int NOT NULL DEFAULT '0' COMMENT '价格', `publish_date` datetime COMMENT '出版日期', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='书籍表';
配置GEN,生成对应代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mainimport ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gen" )const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/testdb1?charset=utf8mb4&parseTime=True" func connectDB (dsn string ) *gorm.DB { db, err := gorm.Open(mysql.Open(dsn)) if err != nil { panic (fmt.Errorf("connect db fail: %w" , err)) } return db }func main () { g := gen.NewGenerator(gen.Config{ OutPath: "../../dal/query" , Mode: gen.WithDefaultQuery | gen.WithQueryInterface, }) g.UseDB(connectDB(MySQLDSN)) g.ApplyBasic(g.GenerateAllTable()...) g.Execute() }
目录结构
1 2 3 4 5 6 7 8 9 10 11 12 . ├── cmd │ └── gen │ └── generate.go ├── dal │ ├── model │ │ └── book.gen.go │ └── query │ ├── book.gen.go │ └── gen.go ├── go.mod └── go.sum
用一用生成的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package mainimport ( "context" "fmt" "gen_demo/dal/model" "gen_demo/dal/query" "gorm.io/gen/examples/dal" "time" )const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/testdb1?charset=utf8mb4&parseTime=True" func init () { dal.DB = dal.ConnectDB(MySQLDSN).Debug() }func main () { query.SetDefault(dal.DB) b1 := &model.Book{ Title: "book1" , Author: "Forrest" , Price: 100 , PublishDate: time.Now(), } err := query.Book.WithContext(context.Background()).Create(b1) if err != nil { fmt.Printf("Create book failed,err:%v\n" , err) return } ret, err := query.Book.WithContext(context.Background()). Where(query.Book.ID.Eq(1 )). Update(query.Book.Title, "book-2" ) if err != nil { fmt.Printf("update book failed,err:%v\n" , err) return } fmt.Printf("-> ret:%#v\n" , ret) info, err := query.Book.WithContext(context.Background()). Where(query.Book.ID.Eq(1 )).First() if err != nil { fmt.Printf("select book failed,err:%v\n" , err) return } fmt.Printf("-> info:%#v\n" , info) ret, err = query.Book.WithContext(context.Background()).Where(query.Book.ID.Eq(1 )).Delete() if err != nil { fmt.Printf("select book failed,err:%v\n" , err) return } fmt.Printf("-> ret:%#v\n" , ret) fmt.Printf("done..." ) }
对应的目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 . ├── cmd │ └── gen │ └── generate.go ├── dal │ ├── model │ │ ├── book.gen.go │ │ └── db.go │ └── query │ ├── book.gen.go │ └── gen.go ├── go.mod ├── go.sum └── main.go
自定义SQL查询 Gen框架使用模板注释的方法支持自定义SQL查询,我们只需要按对应规则将SQL语句注释到interface的方法上即可。Gen将对其进行解析,并为应用的结构生成查询API。
通常建议将自定义查询方法添加到model
模块下
注释语法 Gen 为动态条件 SQL 支持提供了一些约定语法,分为三个方面:
返回结果
占位符
含义
gen.T
用于返回数据的结构体,会根据生成结构体或者数据库表结构自动生成
gen.M
表示map[string]interface{}
,用于返回数据
gen.RowsAffected
用于执行SQL进行更新或删除时候,用于返回影响行数
error
返回错误(如果有)
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package modelimport "gorm.io/gen" type Querier interface { GetByID(id int ) (gen.T, error ) GetByIDReturnMap(id int ) (gen.M, error ) GetBooksByAuthor(author string ) ([]*gen.T, error ) }
在cmd/gen/generate.go中添加自定义方法绑定关系
1 2 g.ApplyInterface(func (model.Querier) {}, g.GenerateModel("book" ))
重新生成代码后就能使用自定义方法了
名称
描述
@@table
转义和引用表名
@@<name>
从参数中转义并引用表/列名
@<name>
参数中的SQL查询参数
事务操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 err = r.data.query.Transaction(func (tx *query.Query) error { if err := tx.ReviewReplyInfo. WithContext(ctx). Save(reply); err != nil { r.log.WithContext(ctx).Errorf("SaveReply create reply fail, err:%v" , err) return err } if _, err := tx.ReviewInfo. WithContext(ctx). Where(tx.ReviewInfo.ReviewID.Eq(reply.ReviewID)). Update(tx.ReviewInfo.HasReply, 1 ); err != nil { r.log.WithContext(ctx).Errorf("SaveReply update review fail, err:%v" , err) return err } return nil })