go语言微服务与云原生

极客时间 go云原生

链接: https://pan.baidu.com/share/init?surl=9_bcdX5tl25pbzpKX544lQ 提取码: wzhx

什么是微服务

微服务架构风格是一种将单体应用开发为一套小型服务的方法,每个服务都在自己的进程中运行,并且使用轻量级的通信机制(HTTP类型的API)进行通信。

这些服务是围绕业务能力构建的,并且可以通过全自动化的部署机制来进行独立部署。

这些服务可以使用不同的编程语言编写,也能使用不同的数据存储技术。

image-20231207131818485

微服务架构带来的挑战

  • 分布式系统的复杂度
  • 服务依赖管理
  • 数据的一致性保证
  • 测试更加艰难
  • 对DevOps等基础设施要求更高

image-20231207133256660

image-20231207133534770

image-20231207133704833

proto文件

  1. 编写protobuf文件
  2. 生成代码
  3. 编写业务逻辑

调用protoc时,通过传递 go_opt 标志来提供特定于 protocol-gen-go 的标志位参数。可以传递多个go_opt标志位参数。例如,当执行下面的命令时:

1
protoc --proto_path=src --go_out=out --go_opt=paths=source_relative foo.proto bar/baz.proto

编译器将从 src 目录中读取输入文件 foo.protobar/baz.proto,并将输出文件 foo.pb.gobar/baz.pb.go 写入 out 目录。如果需要,编译器会自动创建嵌套的输出子目录,但不会创建输出目录本身。

oneof

Wraper

gRPC

add 示例

  • proto 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 syntax="proto3";

package pb;

option go_package="server/pb";

message AddRequest{
int64 x=1;
int64 y=2;
}

message AddReply{
int64 res=1;
}

service CallService{
rpc Add(AddRequest)returns(AddReply);
}
1
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/addClient.proto 
  • server 端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"net"
"server/pb"
)

type AddServer struct {
*pb.UnimplementedCallServiceServer
}

func (*AddServer) Add(ctx context.Context, in *pb.AddRequest) (*pb.AddReply, error) {
sum := in.GetX() + in.GetY()
return &pb.AddReply{Res: sum}, nil
}

func main() {
l, err := net.Listen("tcp", "127.0.0.1:8989")
if err != nil {
fmt.Println("Listen failed", err)
return
}
fmt.Println(l.Addr().String())
s := grpc.NewServer()
pb.RegisterCallServiceServer(s, &AddServer{})
if err = s.Serve(l); err != nil {
fmt.Println("s.Server failed,err:", err)
return
}
}
  • client 端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"client/pb"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"time"
)

func main() {
conn, err := grpc.Dial("127.0.0.1:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("grpc.Dial failed", err)
return
}
defer conn.Close()
client := pb.NewCallServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

for i := 0; i < 10; i++ {
resp, err := client.Add(ctx, &pb.AddRequest{
X: int64(i),
Y: -100,
})
if err != nil {
fmt.Println("client.Add failed", err)
return
}
fmt.Printf("i: %d resp-> %d\n", i, resp.Res)
}

}

metadata

元数据(metadata)是指在处理RPC请求和响应过程中需要但又不属于具体业务(例如身份验证详细信息)的信息,采用键值对列表的形式,其中键是string类型,值通常是[]string类型,但也可以是二进制数据。gRPC中的 metadata 类似于我们在 HTTP headers中的键值对,元数据可以包含认证token、请求标识和监控标签等。

metadata中的键是大小写不敏感的,由字母、数字和特殊字符-_.组成并且不能以grpc-开头(gRPC保留自用),二进制值的键名必须以-bin结尾。

元数据对 gRPC 本身是不可见的,我们通常是在应用程序代码或中间件中处理元数据,我们不需要在.proto文件中指定元数据。

如何访问元数据取决于具体使用的编程语言。 在Go语言中我们是用google.golang.org/grpc/metadata这个库来操作metadata。

metadata 类型定义如下:

1
type MD map[string][]string

元数据可以像普通map一样读取。注意,这个 map 的值类型是[]string,因此用户可以使用一个键附加多个值。

创建新的metadata

第一种方法是使用函数 New 基于map[string]string 创建元数据:

1
md:=metadata.New(map[string]string{"k1":"v1","k2":"v2"})

另一种方法是使用Pairs。具有相同键的值将合并到一个列表中:

1
2
3
4
5
md:=metadata.Pairs(
"k1","v1",
"k1","v2",//k1 的值为 []string{"v1","v2"}
"k2","v3",
)

注意: 所有的键将自动转换为小写

元数据中存储二进制数据

在元数据中,键始终是字符串。但是值可以是字符串或二进制数据。要在元数据中存储二进制数据值,只需在密钥中添加“-bin”后缀。在创建元数据时,将对带有“-bin”后缀键的值进行编码:

1
2
3
4
5
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}), // 二进制数据在发送前会进行(base64) 编码
// 收到后会进行解码
)
从上下文获取元数据
1
2
3
4
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
客户端
发送metadata

有两种方法可以将元数据发送到服务端。推荐的方法是使用 AppendToOutgoingContext 将 kv 对附加到context。无论context中是否已经有元数据都可以使用这个方法。如果先前没有元数据,则添加元数据; 如果context中已经存在元数据,则将 kv 对合并进去。

1
2
3
4
5
6
7
8
9
10
11
// 创建带有metadata的context
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 添加一些 metadata 到 context (e.g. in an interceptor)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

// 或者发起流式RPC请求
stream, err := client.SomeStreamingRPC(ctx)

或者,可以使用 NewOutgoingContext 将元数据附加到context。但是,这将替换context中的任何已有的元数据,因此必须注意保留现有元数据(如果需要的话)。这个方法比使用 AppendToOutgoingContext 要慢。这方面的一个例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建带有metadata的context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 添加一些metadata到context (e.g. in an interceptor)
send, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

// 或者发起流式RPC请求
stream, err := client.SomeStreamingRPC(ctx)
接受metadata

客户端可以接收的元数据包括header和trailer。

trailer可以用于服务器希望在处理请求后给客户端发送任何内容,例如在流式RPC中只有等所有结果都流到客户端后才能计算出负载信息,这时候就不能使用headers(header在数据之前,trailer在数据之后)。

引申:HTTP trailer

普通调用

可以使用 CallOption 中的 HeaderTrailer 函数来获取普通RPC调用发送的header和trailer:

1
2
3
4
5
6
7
8
9
var header, trailer metadata.MD // 声明存储header和trailer的变量
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 将会接收header
grpc.Trailer(&trailer), // 将会接收trailer
)

// do something with header and trailer

流式调用

使用接口 ClientStream 中的 HeaderTrailer 函数,可以从返回的流中接收 Header 和 Trailer:

1
2
3
4
5
6
7
stream, err := client.SomeStreamingRPC(ctx)

// 接收 header
header, err := stream.Header()

// 接收 trailer
trailer := stream.Trailer()
服务端
发送metadata

普通调用

在普通调用中,服务器可以调用 grpc 模块中的 SendHeaderSetTrailer 函数向客户端发送header和trailer。这两个函数将context作为第一个参数。它应该是 RPC 处理程序的上下文或从中派生的上下文:

1
2
3
4
5
6
7
8
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创建和发送 header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 创建和发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}

流式调用

对于流式调用,可以使用接口 ServerStream 中的 SendHeaderSetTrailer 函数发送header和trailer:

1
2
3
4
5
6
7
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// 创建和发送 header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// 创建和发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
接受metadata

要读取客户端发送的元数据,服务器需要从 RPC 上下文检索它。如果是普通RPC调用,则可以使用 RPC 处理程序的上下文。对于流调用,服务器需要从流中获取上下文。

普通调用

1
2
3
4
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}

流式调用

1
2
3
4
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}

gRPC错误处理

似于HTTP定义了一套响应状态码,gRPC也定义有一些状态码。Go语言中此状态码由codes定义,本质上是一个uint32。

1
type Code uint32

使用时需导入google.golang.org/grpc/codes包。

1
import "google.golang.org/grpc/codes"

目前已经定义的状态码有如下几种。

Code 含义
OK 0 请求成功
Canceled 1 操作已取消
Unknown 2 未知错误。如果从另一个地址空间接收到的状态值属 于在该地址空间中未知的错误空间,则可以返回此错误的示例。 没有返回足够的错误信息的API引发的错误也可能会转换为此错误
InvalidArgument 3 表示客户端指定的参数无效。 请注意,这与 FailedPrecondition 不同。 它表示无论系统状态如何都有问题的参数(例如,格式错误的文件名)。
DeadlineExceeded 4 表示操作在完成之前已过期。对于改变系统状态的操作,即使操作成功完成,也可能会返回此错误。 例如,来自服务器的成功响应可能已延迟足够长的时间以使截止日期到期。
NotFound 5 表示未找到某些请求的实体(例如,文件或目录)。
AlreadyExists 6 创建实体的尝试失败,因为实体已经存在。
PermissionDenied 7 表示调用者没有权限执行指定的操作。 它不能用于拒绝由耗尽某些资源引起的(使用 ResourceExhausted )。 如果无法识别调用者,也不能使用它(使用 Unauthenticated )。
ResourceExhausted 8 表示某些资源已耗尽,可能是每个用户的配额,或者整个文件系统空间不足
FailedPrecondition 9 指示操作被拒绝,因为系统未处于操作执行所需的状态。 例如,要删除的目录可能是非空的,rmdir 操作应用于非目录等。
Aborted 10 表示操作被中止,通常是由于并发问题,如排序器检查失败、事务中止等。
OutOfRange 11 表示尝试超出有效范围的操作。
Unimplemented 12 表示此服务中未实施或不支持/启用操作。
Internal 13 意味着底层系统预期的一些不变量已被破坏。 如果你看到这个错误,则说明问题很严重。
Unavailable 14 表示服务当前不可用。这很可能是暂时的情况,可以通过回退重试来纠正。 请注意,重试非幂等操作并不总是安全的。
DataLoss 15 表示不可恢复的数据丢失或损坏
Unauthenticated 16 表示请求没有用于操作的有效身份验证凭据
_maxCode 17 -

代码示例

  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"client/pb"
"context"
"fmt"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"log"
"time"
)

func main() {

conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("grpc.Dial failed,err:%v", err)
return
}
defer conn.Close()
//创建客户端
c := pb.NewGreeterClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: "YST"})
if err != nil {
//收到带有detail的error
s := status.Convert(err)
for _, d := range s.Details() {
switch info := d.(type) {
case *errdetails.QuotaFailure:
fmt.Printf("QuotaFailure:%s\n", info)
default:
fmt.Printf("unexpected type:%v\n", info)
}
}
fmt.Printf("c.SayHello failed, err:%v\n", err)
return
}
fmt.Println("resp->", resp.Reply)
}
  • server端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"context"
"fmt"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net"
"server/pb"
"sync"
)

type server struct {
pb.UnimplementedGreeterServer
mu sync.Mutex
count map[string]int
}

func (s server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
// 记录name的请求次数
s.count[in.Name]++
if s.count[in.GetName()] > 1 {
// 返回请求次数限制的错误
st := status.New(codes.ResourceExhausted, "reqest limited")
// 添加错误详情信息,需要接收返回的status
ds, err := st.WithDetails(
&errdetails.QuotaFailure{
Violations: []*errdetails.QuotaFailure_Violation{
{
Subject: fmt.Sprintf("name:%s", in.GetName()),
Description: "每个name只能调用一次SayHello",
},
},
},
)
// WithDetails执行失败,返回原来status.Err
if err != nil {
return nil, st.Err()
}
return nil, ds.Err()
}
reply := "hello " + in.GetName()
return &pb.HelloResponse{Reply: reply}, nil
}

func main() {
l, err := net.Listen("tcp", "127.0.0.1:8972")
if err != nil {
fmt.Println("net.Listen failed", err)
return
}
fmt.Printf("begin to listen on addr %v\n", l.Addr().String())
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{count: make(map[string]int)})

if err = s.Serve(l); err != nil {
fmt.Printf("s.server failed,%v", err)
return
}
}

加密或认证

使用服务器身份验证 SSL/TLS

gRPC 内置支持 SSL/TLS,可以通过 SSL/TLS 证书建立安全连接,对传输的数据进行加密处理。

这里我们演示如何使用自签名证书进行server端加密。

生成证书
生成私钥

执行下面的命令生成私钥文件——server.key

1
openssl ecparam -genkey -name secp384r1 -out server.key
1
2
3
4
5
6
7
8
9
-----BEGIN EC PARAMETERS-----
BgUrgQQAIg==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MIGkAgEBBDD7h65GkT5/BTGxG8ZSPzxAyZQcQZeGYs+dcqYbxYVMh01cLG7Q9AfM
VMzdPWQSSVygBwYFK4EEACKhZANiAARXP8OxhpgV7JxFoCxY4byJ926gS6xXRX/d
EC4oGmbcvN46tsXI9CThGtLlzXbI73ICgqGy1iNiCf+2KHngyH//2VF06h5Zr1XX
vjAVBsBzjR648aCGFmG7j+j8TIPc5xU=
-----END EC PRIVATE KEY-----

这段文本包含了一个椭圆曲线(Elliptic Curve)私钥的信息,同样以 “—–BEGIN EC PARAMETERS—–” 开头,以 “—–END EC PARAMETERS—–” 结尾。它还包含了相应的私钥,以 “—–BEGIN EC PRIVATE KEY—–” 开头,以 “—–END EC PRIVATE KEY—–” 结尾。这种格式同样是为了在文本之间传输方便而进行的 Base64 编码。

让我们对这两个部分进行解释:

  1. 椭圆曲线参数(EC PARAMETERS):
    • 在这个部分,包含了椭圆曲线的相关参数。具体的参数内容在这里是 Base64 编码的,如果需要详细了解,可能需要解码这部分内容。
  2. 椭圆曲线私钥(EC PRIVATE KEY):
    • 这一部分包含了使用椭圆曲线加密算法生成的私钥。
    • Base64 编码的私钥内容。

这段文本表示一个使用椭圆曲线加密算法的密钥对。椭圆曲线密码学在某些情况下相对于传统的 RSA 密码学更高效,因为它提供相同的安全性,但需要更短的密钥长度。这对于资源受限的环境,比如嵌入式设备或移动设备,是非常有利的。

这里生成的是ECC私钥,当然你也可以使用RSA。

生成自签名的证书

为了在证书中添加SANs信息,我们将下面自定义配置保存到server.cnf文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[ req ]
default_bits = 4096
default_md = sha256
distinguished_name = req_distinguished_name
req_extensions = req_ext

[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = BEIJING
localityName = Locality Name (eg, city)
localityName_default = BEIJING
organizationName = Organization Name (eg, company)
organizationName_default = DEV
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = liwenzhou.com

[ req_ext ]
subjectAltName = @alt_names

[alt_names]
DNS.1 = localhost
DNS.2 = liwenzhou.com
IP = 127.0.0.1

执行下面的命令生成自签名证书——server.crt

1
openssl req -nodes -new -x509 -sha256 -days 3650 -config server.cnf -extensions 'req_ext' -key server.key -out server.crt
建立安全连接

Server端使用credentials.NewServerTLSFromFile函数分别加载证书server.cert和秘钥server.key

1
2
3
4
5
creds, _ := credentials.NewServerTLSFromFile(certFile, keyFile)
s := grpc.NewServer(grpc.Creds(creds))
lis, _ := net.Listen("tcp", "127.0.0.1:8972")
// error handling omitted
s.Serve(lis)

而client端使用上一步生成的证书文件——server.cert建立安全连接。

1
2
3
4
5
creds, _ := credentials.NewClientTLSFromFile(certFile, "")
conn, _ := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(creds))
// error handling omitted
client := pb.NewGreeterClient(conn)
// ...

除了这种自签名证书的方式外,生产环境对外通信时通常需要使用受信任的CA证书。

拦截器

服务注册与服务发现

服务及其调用方直接与注册中心交互

image-20231212153923974

image-20231212153952711

通过部署基础设施来处理服务发现

image-20231212154358522

image-20231212154651038

主流注册中心对比

image-20231212221020154

上图所说的CP是值CAP理论中的CP

CAP理论

  • 一致性(Consistency):所有节点在同一个时间具有相同的数据
  • 可用性(Availability):保证每个请求不管成功或者失败都有相应
  • 分区容忍性(Partotion tolerance):系统中任意的信息丢失或者是失败都不会影响系统的继续运转

consul服务注册

image-20231213134845364

注册consul节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main

import (
"context"
"fmt"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"hello_server/pb"
"log"
"net"
)

const serviceName = "hello_server"

type server struct {
pb.UnimplementedGreeterServer
}

func (server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
reply := "hello," + in.Name
return &pb.HelloResponse{Reply: reply}, nil
}

// GetOutboundIP 获取本机的出口IP
func GetOutboundIP() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return nil, err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP, nil
}

func main() {
l, err := net.Listen("tcp", ":8977")
if err != nil {
log.Fatal("failed to listen,err", err)
return
}
//创建grpc服务
s := grpc.NewServer()
// 注册grpc服务
pb.RegisterGreeterServer(s, &server{})

healthcheck := health.NewServer()
healthpb.RegisterHealthServer(s, healthcheck)
//注册consul节点
cc, err := api.NewClient(api.DefaultConfig())
if err != nil {
fmt.Println("api.NewClient failed", err)
return
}
//定义服务 配置健康检查
IP, err := GetOutboundIP()
if err != nil {
fmt.Println("GetOutboundIP failed", err)
return
}
fmt.Println(IP.String())
check := &api.AgentServiceCheck{
GRPC: fmt.Sprintf("%s:%d", IP.String(), 8977), // 这里一定是外部可以访问的地址
Timeout: "10s", // 超时时间
Interval: "10s", // 运行检查的频率
// 指定时间后自动注销不健康的服务节点
// 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。
DeregisterCriticalServiceAfter: "1m",
}
srv := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%s-%s-%d", serviceName, IP.String(), 8977), // 服务唯一ID
Name: serviceName, // 服务名称
Tags: []string{"sayhello", "Forrest"}, // 为服务打标签
Address: IP.String(),
Port: 8977,
Check: check,
}
//将服务注册到consul中
err = cc.Agent().ServiceRegister(srv)

if err != nil {
fmt.Println(" cc.Agent().ServiceRegister(srv) failed", err)
return
}
//启动服务
err = s.Serve(l)
if err != nil {
log.Fatal("failed to Serve,err", err)
return
}
}

服务发现

gRPC支持自定义resolver,借助第三方的grpc-consul-resolver库,我们可以更便捷的实现基于consul的服务发现。

1
import _ "github.com/mbobakov/grpc-consul-resolver"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
"context"
"flag"
"fmt"
"github.com/Forrest/backend/yst/hello_client/pb"
_ "github.com/mbobakov/grpc-consul-resolver" //服务发现
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)

var name = flag.String("name", "yst", "通过 -name指定你的名字")

func main() {
flag.Parse()
// 连接server

/* cc, err := api.NewClient(api.DefaultConfig())
if err != nil {
fmt.Println("NewClient failed,err", err)
return
}
serviceMap, err := cc.Agent().ServicesWithFilter("Service==`hello_server`")
if err != nil {
fmt.Println("ServicesWithFilter failed", err)
return
}
// 选一个服务机(这里选最后一个)
var addr string
for k, v := range serviceMap {
fmt.Printf("%s:%#v\n", k, v)
addr = v.Address + ":" + strconv.Itoa(v.Port)
}*/
/* conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))*/

conn, err := grpc.Dial(
"consul://localhost:8500/hello_server",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatal("grpc.Dail failed,err:", err)
return
}
defer conn.Close()

c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
resp, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Printf("c.SayHrllo failed,err:%v", err)
}
fmt.Println("resp:", resp.GetReply())
}

grpc.Dial时直接使用类似 consul://[user:password@]127.0.0.127:8555/my-service?[healthy=]&[wait=]&[near=]&[insecure=]&[limit=]&[tag=]&[token=]的连接字符串来指定连接目标。

目前支持的参数:

Name 格式 介绍
tag string 根据标签筛选
healthy true/false 只返回通过所有健康检查的端点。默认值:false
wait time.ParseDuration 监控变更的等待时间。在这个时间段内,端点将强制刷新。默认值:继承agent的配置
insecure true/false 允许与consul进行不安全的通信。默认值:true
near string 按响应持续时间对端点排序。可与“limit”参数有效结合。默认值:”_agent”
limit int 限制服务的端点数。默认值:无限制
timeout time.ParseDuration Http-client超时。默认值:60s
max-backoff time.ParseDuration 重新连接到consul的最大后退时间。重连从10ms开始,成倍增长,直到到max-backoff。默认值:1s
token string Consul token
dc string consul数据中心。可选
allow-stale true/false 允许agent返回过期读的结果 https://developer.hashicorp.com/consul/api-docs/features/consistency#stale
require-consistent true/false 强制读取完全一致。这比较昂贵,但可以防止执行过期读操作

kratos 框架

新建一个项目

1
kratos new bubble 

kratos中使用 proto文件 来完成通信 ,新建一个proto文件

1
kratos proto add api/bubble/v1/todo.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
syntax = "proto3";
package api.bubble.v1;
import "google/api/annotations.proto";
option go_package = "bubble/api/bubble/v1;v1";
option java_multiple_files = true;
option java_package = "api.bubble.v1";
service Todo {
rpc CreateTodo (CreateTodoRequest) returns (CreateTodoReply){
option (google.api.http) = {
post: "/v1/todo",
body: "*",
};
}
rpc UpdateTodo (UpdateTodoRequest) returns (UpdateTodoReply){
option (google.api.http) = {
put: "/v1/todo/{id}",
body: "*",
};
}
rpc DeleteTodo (DeleteTodoRequest) returns (DeleteTodoReply){
option (google.api.http) = {
delete: "/v1/todo/{id}",
};
}
rpc GetTodo (GetTodoRequest) returns (GetTodoReply){
option (google.api.http) = {
get: "/v1/todo/{id}",
};
}
rpc ListTodo (ListTodoRequest) returns (ListTodoReply){
option (google.api.http) = {
get: "/v1/todos",
};
}
}
message todo {
int64 id = 1;
string title = 2;
bool status = 3;
}
message CreateTodoRequest {
string title = 1;
}
message CreateTodoReply {
todo todo = 1;
}
message UpdateTodoRequest {
int64 id = 1;
string title = 2;
bool status = 3;
}
message UpdateTodoReply {
}
message DeleteTodoRequest {
int64 id = 1;
}

message DeleteTodoReply{}
message GetTodoRequest {
int64 id = 1;
}
message GetTodoReply {
todo todo = 1;
}
message ListTodoRequest {}
message ListTodoReply {
repeated todo data = 1;
}

生成proto代码

1
kratos proto client api/bubble/v1/todo.proto

通过proto文件,可以直接生成对应的Service实现代码;使用kratos命令并且通过 -t 指定生成代码的保存目录

1
2
3
kratos proto server api/bubble/v1/todo.proto -t internal/service

kratos proto client api/review/v1/review_error.proto

server 端完整实例

服务注册 健康检查 接受退出服务信号服务注销

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main

import (
"context"
"fmt"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"hello_server/pb"
"log"
"net"
"os"
"os/signal"
"syscall"
)

const serviceName = "hello_server"

type server struct {
pb.UnimplementedGreeterServer
}

func (server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
reply := "hello," + in.Name
return &pb.HelloResponse{Reply: reply}, nil
}

type consul struct {
client *api.Client
}

func NewConsul(addr string) (*consul, error) {
cfg := api.DefaultConfig()
client, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
return &consul{client: client}, nil
}

func (cc *consul) RegisterService(serviceName string, IP string, port int) error {
check := &api.AgentServiceCheck{
GRPC: fmt.Sprintf("%s:%d", IP, 8972), // 这里一定是外部可以访问的地址
Timeout: "10s", // 超时时间
Interval: "10s", // 运行检查的频率
// 指定时间后自动注销不健康的服务节点
// 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。
DeregisterCriticalServiceAfter: "1m",
}
srv := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%s-%s-%d", serviceName, IP, port), // 服务唯一ID
Name: serviceName, // 服务名称
Tags: []string{"important", "Forrest"}, // 为服务打标签
Address: IP,
Port: port,
Check: check,
}
//将服务注册到consul中
return cc.client.Agent().ServiceRegister(srv)
}

// GetOutboundIP 获取本机的出口IP
func GetOutboundIP() (net.IP, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return nil, err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP, nil
}

// Deregister 注销服务
func (c *consul) Deregister(serviceID string) error {
return c.client.Agent().ServiceDeregister(serviceID)
}

func main() {
port := 8972
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer() // 创建gRPC服务器
// 开启健康检查
healthcheck := health.NewServer()
healthpb.RegisterHealthServer(s, healthcheck)
pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务端注册服务
// 注册服务
consul, err := NewConsul("127.0.0.1:8500")
if err != nil {
log.Fatalf("NewConsul failed, err:%v\n", err)
}
ipObj, _ := GetOutboundIP()
ip := ipObj.String()
serviceId := fmt.Sprintf("%s-%s-%d", "hello", ip, port)
fmt.Println(serviceId)
consul.RegisterService(serviceId, ip, port)

// 启动服务
go func() {
err = s.Serve(lis)
if err != nil {
log.Printf("failed to serve: %v", err)
return
}
}()

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
<-quit
// 退出时注销服务
consul.Deregister(serviceId)

}

依赖注入工具 wire

google开源,编译期间完成依赖注入。

provider(提供者)

可导出的

injector(注入器)

应用程序中使用一个注入器来提供 提供者,注入器就是一个按照依赖顺序调用提供者

使用 wire 时,只需要编写注入器的函数签名,然后 wire 会生成对应的 函数体

image-20231210122219231

image-20231210122530841

image-20231210132453233

项目实战

image-20231210140759957

image-20231210140750519

image-20231210141008431

系统分析

输入输出

image-20231210155901691

功能模块

image-20231210160346172

C端:用户端,包括发表评论和查看评论的用户

B端:商家端,店铺商家端,店铺管理者,商品发布者

O端:运营端,运营的同学在后台负责审核用户评论,处理商家申诉,以及评论进行运营活动

按照数据流动的角度去分析
电商

image-20231210161129270

读多写少

UGC评论系统

image-20231210161734299

读多写多

管理项目于代码的方式

  • mono-repo (monolithic repository)

  • multi-repo +submodule

项目中如何管理pb文件

  • protobuf文件要一致
  • protoc版本一致

通常公司中是把 proto文件和生成的不同语言的代码都放在一个独立的公用代码库

别的项目直接引用这个公用的代码

1
git submodule add git@github.com:Q1mi/reviewapis.git ./api

更新submodule

1
git submodule update

用来初始化本地 配置文件

1
git submodule init

添加到安全组

1
git config --global --add safe.directory D:/Go_WorkSpace/review-b

代码实现

基本框架

  • 创建新项目
1
kratos new review
  • 添加proto文件
1
kratos proto add api/review/v1/review.proto
  • 书写proto文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    syntax = "proto3";

    package api.review.v1;
    import "google/api/annotations.proto";
    import "validate/validate.proto";
    option go_package = "review/api/review/v1;v1";
    option java_multiple_files = true;
    option java_package = "api.review.v1";

    service Review {
    rpc CreateReview (CreateReviewRequest) returns (CreateReviewReply){
    option (google.api.http) = {
    post: "/v1/review",
    body: "*"
    };
    };
    rpc UpdateReview (UpdateReviewRequest) returns (UpdateReviewReply);
    rpc DeleteReview (DeleteReviewRequest) returns (DeleteReviewReply);
    rpc GetReview (GetReviewRequest) returns (GetReviewReply);
    rpc ListReview (ListReviewRequest) returns (ListReviewReply);
    }

    message CreateReviewRequest {
    int64 userID = 1 [(validate.rules).int64 = {gt: 0}];
    int64 orderID = 2 [(validate.rules).int64 = {gt: 0}];
    int32 score = 3 [(validate.rules).int32 = {in: [1,2,3,4,5]}];
    int32 serviceScore = 4 [(validate.rules).int32 = {in: [1,2,3,4,5]}];
    int32 expressScore = 5 [(validate.rules).int32 = {in: [1,2,3,4,5]}];
    string content = 6 [(validate.rules).string = {min_len: 8, max_len: 255}];
    string picInfo = 7;
    string videoInfo = 8;
    bool anonymous = 9;
    }
    message CreateReviewReply {
    int64 reviewID = 1;
    }

    message UpdateReviewRequest {}
    message UpdateReviewReply {}

    message DeleteReviewRequest {}
    message DeleteReviewReply {}

    message GetReviewRequest {}
    message GetReviewReply {}

    message ListReviewRequest {}
    message ListReviewReply {}
  • 生成client代码

1
kratos proto client api/review/v1/review.proto
  • 生成service代码
1
kratos proto server api/review/v1/review.proto  -t internal/service

server- > service -> biz -> data

wire

gen data层的配置

服务注册

  • 配置文件

    • yaml
    • proto
    • 生成 pb文件
  • server 层

    • 写 Registry的构造函数

    • 添加到 provider中

    •   // ProviderSet is server providers.
        var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewRegistrar)
        
        func NewRegistrar(conf *conf.Registry) registry.Registrar {
            c := consulAPI.DefaultConfig()
            c.Address = conf.Consul.Address
            c.Scheme = conf.Consul.Scheme
            cli, err := consulAPI.NewClient(c)
            if err != nil {
               panic(err)
            }
            r := consul.New(cli, consul.WithHealthCheck(false))
            return r
        }
        
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16

      - cmd

      - Name Version

      - ```go
      var (
      // Name is the name of the compiled software.
      Name = "review.service"
      // Version is the version of the compiled software.
      Version string = "v0.1"
      // flagconf is the config flag.
      flagconf string

      id, _ = os.Hostname()
      )
    • newApp 参数填充

      •   func newApp(logger log.Logger, rr registry.Registrar, gs *grpc.Server, hs *http.Server) *kratos.App {
              return kratos.New(
                 kratos.ID(id),
                 kratos.Name(Name),
                 kratos.Version(Version),
                 kratos.Metadata(map[string]string{}),
                 kratos.Logger(logger),
                 kratos.Server(
                    gs,
                    hs,
                 ),
                 kratos.Registrar(rr),
              )
          }
          
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10

        - 解析 pb文件结构体

        - ```go
        var rc conf.Registry
        if err := c.Scan(&rc); err != nil {
        panic(err)
        }

        app, cleanup, err := wireApp(bc.Server, &rc, bc.Data, logger)
    • 填充 wireApp

      •   // wireApp init kratos application.
          func wireApp(*conf.Server, *conf.Registry, *conf.Data, log.Logger) (*kratos.App, func(), error) {
              panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, newApp))
          }
          
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        94
        95
        96
        97
        98
        99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110











        #### 项目sql

        ```sql
        CREATE TABLE review_info (
        `id` bigint(32) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
        `create_by` varchar(48) NOT NULL DEFAULT '' COMMENT '创建方标识',
        `update_by` varchar(48) NOT NULL DEFAULT '' COMMENT '更新方标识',
        `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
        `delete_at` timestamp COMMENT '逻辑删除标记',
        `version` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记',

        `review_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '评价id',
        `content` varchar(512) NOT NULL COMMENT '评价内容',
        `score` tinyint(4) NOT NULL DEFAULT '0' COMMENT '评分',
        `service_score` tinyint(4) NOT NULL DEFAULT '0' COMMENT '商家服务评分',
        `express_score` tinyint(4) NOT NULL DEFAULT '0' COMMENT '物流评分',
        `has_media` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否有图或视频',
        `order_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '订单id',
        `sku_id` bigint(32) NOT NULL DEFAULT '0' COMMENT 'sku id',
        `spu_id` bigint(32) NOT NULL DEFAULT '0' COMMENT 'spu id',
        `store_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '店铺id',
        `user_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '用户id',
        `anonymous` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否匿名',
        `tags` varchar(1024) NOT NULL DEFAULT '' COMMENT '标签json',
        `pic_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:图片',
        `video_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:视频',
        `status` tinyint(4) NOT NULL DEFAULT '10' COMMENT '状态:10待审核;20审核通过;30审核不通过;40隐藏',
        `is_default` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否默认评价',
        `has_reply` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否有商家回复:0无;1有',
        `op_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '运营审核拒绝原因',
        `op_remarks` varchar(512) NOT NULL DEFAULT '' COMMENT '运营备注',
        `op_user` varchar(64) NOT NULL DEFAULT '' COMMENT '运营者标识',

        `goods_snapshoot` varchar(2048) NOT NULL DEFAULT '' COMMENT '商品快照信息',
        `ext_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '信息扩展',
        `ctrl_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '控制扩展',
        PRIMARY KEY (`id`),
        KEY `idx_delete_at` (`delete_at`) COMMENT '逻辑删除索引',
        UNIQUE KEY `uk_review_id` (`review_id`) COMMENT '评价id索引',
        KEY `idx_order_id` (`order_id`) COMMENT '订单id索引',
        KEY `idx_user_id` (`user_id`) COMMENT '用户id索引'
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价表';

        CREATE TABLE review_reply_info (
        `id` bigint(32) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
        `create_by` varchar(48) NOT NULL DEFAULT '' COMMENT '创建方标识',
        `update_by` varchar(48) NOT NULL DEFAULT '' COMMENT '更新方标识',
        `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
        `delete_at` timestamp COMMENT '逻辑删除标记',
        `version` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记',

        `reply_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '回复id',
        `review_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '评价id',
        `store_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '店铺id',
        `content` varchar(512) NOT NULL COMMENT '评价内容',
        `pic_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:图片',
        `video_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:视频',

        `ext_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '信息扩展',
        `ctrl_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '控制扩展',
        PRIMARY KEY (`id`),
        KEY `idx_delete_at` (`delete_at`) COMMENT '逻辑删除索引',
        UNIQUE KEY `uk_reply_id` (`reply_id`) COMMENT '回复id索引',
        KEY `idx_review_id` (`review_id`) COMMENT '评价id索引',
        KEY `idx_store_id` (`store_id`) COMMENT '店铺id索引'
        )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价商家回复表';


        CREATE TABLE review_appeal_info (
        `id` bigint(32) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
        `create_by` varchar(48) NOT NULL DEFAULT '' COMMENT '创建方标识',
        `update_by` varchar(48) NOT NULL DEFAULT '' COMMENT '更新方标识',
        `create_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
        `delete_at` timestamp COMMENT '逻辑删除标记',
        `version` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '乐观锁标记',

        `appeal_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '回复id',
        `review_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '评价id',
        `store_id` bigint(32) NOT NULL DEFAULT '0' COMMENT '店铺id',
        `status` tinyint(4) NOT NULL DEFAULT '10' COMMENT '状态:10待审核;20申诉通过;30申诉驳回',
        `reason` varchar(255) NOT NULL COMMENT '申诉原因类别',
        `content` varchar(255) NOT NULL COMMENT '申诉内容描述',
        `pic_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:图片',
        `video_info` varchar(1024) NOT NULL DEFAULT '' COMMENT '媒体信息:视频',

        `op_remarks` varchar(512) NOT NULL DEFAULT '' COMMENT '运营备注',
        `op_user` varchar(64) NOT NULL DEFAULT '' COMMENT '运营者标识',

        `ext_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '信息扩展',
        `ctrl_json` varchar(1024) NOT NULL DEFAULT '' COMMENT '控制扩展',
        PRIMARY KEY (`id`),
        KEY `idx_delete_at` (`delete_at`) COMMENT '逻辑删除索引',
        KEY `idx_appeal_id` (`appeal_id`) COMMENT '申诉id索引',
        UNIQUE KEY `uk_review_id` (`review_id`) COMMENT '评价id索引',
        KEY `idx_store_id` (`store_id`) COMMENT '店铺id索引'
        )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='评价商家申诉表';

表结构

1
desc table_name;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# review_appeal_info
+------------+---------------------+------+-----+---------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+------------+---------------------+------+-----+---------------------+-----------------------------+
| id | bigint(32) unsigned | NO | PRI | NULL | auto_increment |
| create_by | varchar(48) | NO | | | |
| update_by | varchar(48) | NO | | | |
| create_at | timestamp | NO | | CURRENT_TIMESTAMP | |
| update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | |
| version | int(10) unsigned | NO | | 0 | |
| appeal_id | bigint(32) | NO | MUL | 0 | |
| review_id | bigint(32) | NO | UNI | 0 | |
| store_id | bigint(32) | NO | MUL | 0 | |
| status | tinyint(4) | NO | | 10 | |
| reason | varchar(255) | NO | | NULL | |
| content | varchar(255) | NO | | NULL | |
| pic_info | varchar(1024) | NO | | | |
| video_info | varchar(1024) | NO | | | |
| op_remarks | varchar(512) | NO | | | |
| op_user | varchar(64) | NO | | | |
| ext_json | varchar(1024) | NO | | | |
| ctrl_json | varchar(1024) | NO | | | |
+------------+---------------------+------+-----+---------------------+-----------------------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# review_reply_info
+------------+---------------------+------+-----+---------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+------------+---------------------+------+-----+---------------------+-----------------------------+
| id | bigint(32) unsigned | NO | PRI | NULL | auto_increment |
| create_by | varchar(48) | NO | | | |
| update_by | varchar(48) | NO | | | |
| create_at | timestamp | NO | | CURRENT_TIMESTAMP | |
| update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | |
| version | int(10) unsigned | NO | | 0 | |
| reply_id | bigint(32) | NO | UNI | 0 | |
| review_id | bigint(32) | NO | MUL | 0 | |
| store_id | bigint(32) | NO | MUL | 0 | |
| content | varchar(512) | NO | | NULL | |
| pic_info | varchar(1024) | NO | | | |
| video_info | varchar(1024) | NO | | | |
| ext_json | varchar(1024) | NO | | | |
| ctrl_json | varchar(1024) | NO | | | |
+------------+---------------------+------+-----+---------------------+-----------------------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#review_info
+-----------------+---------------------+------+-----+---------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+---------------------+------+-----+---------------------+-----------------------------+
| id | bigint(32) unsigned | NO | PRI | NULL | auto_increment |
| create_by | varchar(48) | NO | | | |
| update_by | varchar(48) | NO | | | |
| create_at | timestamp | NO | | CURRENT_TIMESTAMP | |
| update_at | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| delete_at | timestamp | NO | MUL | 0000-00-00 00:00:00 | |
| version | int(10) unsigned | NO | | 0 | |
| review_id | bigint(32) | NO | UNI | 0 | |
| content | varchar(512) | NO | | NULL | |
| score | tinyint(4) | NO | | 0 | |
| service_score | tinyint(4) | NO | | 0 | |
| express_score | tinyint(4) | NO | | 0 | |
| has_media | tinyint(4) | NO | | 0 | |
| order_id | bigint(32) | NO | MUL | 0 | |
| sku_id | bigint(32) | NO | | 0 | |
| spu_id | bigint(32) | NO | | 0 | |
| store_id | bigint(32) | NO | | 0 | |
| user_id | bigint(32) | NO | MUL | 0 | |
| anonymous | tinyint(4) | NO | | 0 | |
| tags | varchar(1024) | NO | | | |
| pic_info | varchar(1024) | NO | | | |
| video_info | varchar(1024) | NO | | | |
| status | tinyint(4) | NO | | 10 | |
| is_default | tinyint(4) | NO | | 0 | |
| has_reply | tinyint(4) | NO | | 0 | |
| op_reason | varchar(512) | NO | | | |
| op_remarks | varchar(512) | NO | | | |
| op_user | varchar(64) | NO | | | |
| goods_snapshoot | varchar(2048) | NO | | | |
| ext_json | varchar(1024) | NO | | | |
| ctrl_json | varchar(1024) | NO | | | |
+-----------------+---------------------+------+-----+---------------------+-----------------------------+

待续。。。

  • gen实现数据库表和字段的映射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// cmd/gen/generato
package main

// gorm gen configure

import (
"fmt"

"gorm.io/driver/mysql"
"gorm.io/gorm"

"gorm.io/gen"
)

const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/review_service?charset=utf8mb4&parseTime=True"

func connectDB(dsn string) *gorm.DB {
db, err := gorm.Open(mysql.Open(dsn))
if err != nil {
panic(fmt.Errorf("connect db fail: %w", err))
}
return db
}

func main() {
// 指定生成代码的具体相对目录(相对当前文件),默认为:./query
// 默认生成需要使用WithContext之后才可以查询的代码,但可以通过设置gen.WithoutContext禁用该模式
g := gen.NewGenerator(gen.Config{
// 默认会在 OutPath 目录生成CRUD代码,并且同目录下生成 model 包
// 所以OutPath最终package不能设置为model,在有数据库表同步的情况下会产生冲突
// 若一定要使用可以通过ModelPkgPath单独指定model package的名称
OutPath: "../../internal/data/query",
/* ModelPkgPath: "dal/model"*/

// gen.WithoutContext:禁用WithContext模式
// gen.WithDefaultQuery:生成一个全局Query对象Q
// gen.WithQueryInterface:生成Query接口
Mode: gen.WithDefaultQuery | gen.WithQueryInterface,
})

// 通常复用项目中已有的SQL连接配置db(*gorm.DB)
// 非必需,但如果需要复用连接时的gorm.Config或需要连接数据库同步表信息则必须设置
g.UseDB(connectDB(MySQLDSN))

// 从连接的数据库为所有表生成Model结构体和CRUD代码
// 也可以手动指定需要生成代码的数据表
g.ApplyBasic(g.GenerateAllTable()...)

// 执行并生成代码
g.Execute()
}

GEN的使用

错误处理

snowflake算法

pb 的validate

参数校验 | Kratos (go-kratos.dev)

  • 根据validate的规则给probuf文件写 validate
  • make validate,生成validate的pb文件
  • grpc http中插入 validate.Validator(),

copier库

分页处理

1
2
limit = size
offset = (page - 1) * size

C端

  • 用户创建评价
  • 用户查询根据userID查询自己的评价
  • 用户根据OrderID查询评价
  • 查询评价回复

B端

  • 回复评价
    • 字段 reviewID、storeID、content、picInfo、vidInfo
    • 查看评价是否存在
    • 水平鉴权
    • 保存到数据库中
  • 申诉评价
    • 查看该评论的申诉是否存在
      • 存在

        • 查看状态
          • 申诉通过 || 申诉驳回

            • 返回
          • 待审核

            • update
      • 不存在

        • insert

O端

  • AuditReview

    • 更新review表的字段
  • AuditAppeal

    • 更新 appeal表的状态
    • 如果申诉成功,则更改info表的状态,显示为 隐藏

review-job/ review-task

后端服务的分类

  • service :微服务,提供API RPC接口
  • job:流式服务,处理实时流
    • 检查数据的变更,将多张表的数据整合 加工 处理 组成成一个 document,存入ES中
  • task 定时任务,定时处理任务

start

stop

go-kafaka

canal

配置操作
  • 安装canal前我们先开启MySql的 binlog,在MySQL配置文件my.cnf设置如下信息:

    1
    2
    3
    4
    5
    6
    7
    [mysqld] 
    # 打开binlog
    log-bin=mysql-bin
    # 选择ROW(行)模式
    binlog-format=ROW
    # 配置MySQL replaction需要定义,不要和canal的slaveId重复
    server_id=1
  • 查看是否有开启bin_log

    1
    show variables like 'log_bin';
  • 查看bin_log

    1
    show binary logs;
  • 在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

    1
    2
    3
    4
    5
    6
    -- 使用命令登录:
    mysql -u root -p
    -- 创建用户 用户名:canal 密码:Canal@123456
    create user 'canal'@'%' identified by 'Canal@123456';
    -- 授权 *.*表示所有库
    grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

查询 操作

  • 通过singleFlight合并大量相同的并发查询
    • 使用redis查询
      • 缓存命中,直接返回
      • 缓存未命中
        • 使用es查询
        • 将查询存入cache中

image-20231231112216683

Canal mysql的增量数据解析工具

**canal [kə’næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

image-20231222142823491

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

待续。。。。

1
show variables like 'log_bin';

为什么要使用 ES来 实现O端的数据查询?

  • O端需要 多条件的 复制查询(多字段 跨表 多数据 ->超时)

CQRS架构是什么

image-20231222145336075

是 命令查询职责分离 (command QueryResponsibility Segregation)的缩写,在基于CQRS 的系统中,命令(写操作)和查询操作(读操作)所使用的数据模型是由区别的。通过各种机制将命令模型中的变更传播到查询模型中,让两个模型保持一致。

ElasticSearch

ElasticSearch架构主要由三个组件构成:索引、分片和节点

  • 索引是文档的逻辑分组,类似于数据库的表
  • 分片是索引的物理分区,用于提高数据库的分布和性能提升
  • 节点是运行ElasticSearch的节点

image-20231222152752038

工作原理

  1. 接受用户的查询请求:ElasticSearch通过RESTful API 或者是 JSON请求接受用户到查询请求
  2. 路由请求:接收到查询请求后,ElasticSearch根据请求中的索引和分区的信息,将请求路由到相应的节点
  3. 执行查询:节点执行查询请求,并在相应的索引中 查找匹配的文档
  4. 返回结果:结果以json的格式返回给用户,包括匹配的文档和相应字段信息

我的踩坑记录

  • mysql的DSN没加charset=utf8mb4&parseTime=True 报错gorm sql: Scan error on column index 3, name "create_at": unsupported Scan, storing driver.Value type []uint8 into type *time.Time

  • makefile的插件 对空格的识别问题

  • 记得在httpSrv和grpc中添加 validate.Validator(),

  • server 层进行格式转换(proto的数据转为GEN的数据格式)

  • 空指针异常 这行代码只是分配了内存空间,并没有为每个元素分配内存。因此,reviewInfos 中的每个元素都是 nil,即空指针。

    • 方法一

      1
      2
      3
      4
      5
      6
      reviewInfos := make([]*ReviewInfo, len(reviews))

      // 循环遍历切片,并为每个元素分配内存
      for i := range reviews {
      reviewInfos[i] = &ReviewInfo{} // 这里可以根据实际情况初始化 ReviewInfo 的字段
      }
    • 方法二image-20231223101615268

  • limit 因该在 offset前

    •   SELECT * FROM review_info ORDER BY id LIMIT 3 OFFSET 4;		
        
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13

      - 使用 `lmimit offset,rows` 也可也达到同样的效果



      ### GEN 的使用

      #### 基本使用

      安装依赖

      ```bash
      go get -u gorm.io/gen

创建基本表

1
2
3
4
5
6
7
8
9
CREATE TABLE book
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`title` varchar(128) NOT NULL COMMENT '书籍名称',
`author` varchar(128) NOT NULL COMMENT '作者',
`price` int NOT NULL DEFAULT '0' COMMENT '价格',
`publish_date` datetime COMMENT '出版日期',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='书籍表';

配置GEN,生成对应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

// gorm gen configure

import (
"fmt"

"gorm.io/driver/mysql"
"gorm.io/gorm"

"gorm.io/gen"
)

const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/testdb1?charset=utf8mb4&parseTime=True"

func connectDB(dsn string) *gorm.DB {
db, err := gorm.Open(mysql.Open(dsn))
if err != nil {
panic(fmt.Errorf("connect db fail: %w", err))
}
return db
}

func main() {
// 指定生成代码的具体相对目录(相对当前文件),默认为:./query
// 默认生成需要使用WithContext之后才可以查询的代码,但可以通过设置gen.WithoutContext禁用该模式
g := gen.NewGenerator(gen.Config{
// 默认会在 OutPath 目录生成CRUD代码,并且同目录下生成 model 包
// 所以OutPath最终package不能设置为model,在有数据库表同步的情况下会产生冲突
// 若一定要使用可以通过ModelPkgPath单独指定model package的名称
OutPath: "../../dal/query",
/* ModelPkgPath: "dal/model"*/

// gen.WithoutContext:禁用WithContext模式
// gen.WithDefaultQuery:生成一个全局Query对象Q
// gen.WithQueryInterface:生成Query接口
Mode: gen.WithDefaultQuery | gen.WithQueryInterface,
})

// 通常复用项目中已有的SQL连接配置db(*gorm.DB)
// 非必需,但如果需要复用连接时的gorm.Config或需要连接数据库同步表信息则必须设置
g.UseDB(connectDB(MySQLDSN))

// 从连接的数据库为所有表生成Model结构体和CRUD代码
// 也可以手动指定需要生成代码的数据表
g.ApplyBasic(g.GenerateAllTable()...)

// 执行并生成代码
g.Execute()
}

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
.
├── cmd
│   └── gen
│   └── generate.go
├── dal
│   ├── model
│   │   └── book.gen.go
│   └── query
│   ├── book.gen.go
│   └── gen.go
├── go.mod
└── go.sum

用一用生成的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"context"
"fmt"
"gen_demo/dal/model"
"gen_demo/dal/query"
"gorm.io/gen/examples/dal"
"time"
)

// gen demo

const MySQLDSN = "root:571400yst@tcp(127.0.0.1:3306)/testdb1?charset=utf8mb4&parseTime=True"

func init() {
dal.DB = dal.ConnectDB(MySQLDSN).Debug()
}

func main() {
// 设置默认DB对象
query.SetDefault(dal.DB)
b1 := &model.Book{
Title: "book1",
Author: "Forrest",
Price: 100,
PublishDate: time.Now(),
}
// create
err := query.Book.WithContext(context.Background()).Create(b1)
if err != nil {
fmt.Printf("Create book failed,err:%v\n", err)
return
}

//update
ret, err := query.Book.WithContext(context.Background()).
Where(query.Book.ID.Eq(1)).
Update(query.Book.Title, "book-2")
if err != nil {
fmt.Printf("update book failed,err:%v\n", err)
return
}
fmt.Printf("-> ret:%#v\n", ret)

//select
info, err := query.Book.WithContext(context.Background()).
Where(query.Book.ID.Eq(1)).First()
if err != nil {
fmt.Printf("select book failed,err:%v\n", err)
return
}
fmt.Printf("-> info:%#v\n", info)

//delete
ret, err = query.Book.WithContext(context.Background()).Where(query.Book.ID.Eq(1)).Delete()
if err != nil {
fmt.Printf("select book failed,err:%v\n", err)
return
}
fmt.Printf("-> ret:%#v\n", ret)

fmt.Printf("done...")
}

对应的目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
.
├── cmd
│   └── gen
│   └── generate.go
├── dal
│   ├── model
│   │   ├── book.gen.go
│   │   └── db.go
│   └── query
│   ├── book.gen.go
│   └── gen.go
├── go.mod
├── go.sum
└── main.go

自定义SQL查询

Gen框架使用模板注释的方法支持自定义SQL查询,我们只需要按对应规则将SQL语句注释到interface的方法上即可。Gen将对其进行解析,并为应用的结构生成查询API。

通常建议将自定义查询方法添加到model模块下

注释语法

Gen 为动态条件 SQL 支持提供了一些约定语法,分为三个方面:

  • 返回结果
  • 模板占位符
  • 模板表达式

返回结果

占位符 含义
gen.T 用于返回数据的结构体,会根据生成结构体或者数据库表结构自动生成
gen.M 表示map[string]interface{},用于返回数据
gen.RowsAffected 用于执行SQL进行更新或删除时候,用于返回影响行数
error 返回错误(如果有)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// dal/model/querier.go

package model

import "gorm.io/gen"

// 通过添加注释生成自定义方法

type Querier interface {
// SELECT * FROM @@table WHERE id=@id
GetByID(id int) (gen.T, error) // 返回结构体和error

// GetByIDReturnMap 根据ID查询返回map
//
// SELECT * FROM @@table WHERE id=@id
GetByIDReturnMap(id int) (gen.M, error) // 返回 map 和 error

// SELECT * FROM @@table WHERE author=@author
GetBooksByAuthor(author string) ([]*gen.T, error) // 返回数据切片和 error
}

在cmd/gen/generate.go中添加自定义方法绑定关系

1
2
// 通过ApplyInterface添加为book表添加自定义方法
g.ApplyInterface(func(model.Querier) {}, g.GenerateModel("book"))

image-20231211162235232

重新生成代码后就能使用自定义方法了

名称 描述
@@table 转义和引用表名
@@<name> 从参数中转义并引用表/列名
@<name> 参数中的SQL查询参数

事务操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 事务操作
err = r.data.query.Transaction(func(tx *query.Query) error {
// 回复表插入一条数据
if err := tx.ReviewReplyInfo.
WithContext(ctx).
Save(reply); err != nil {
r.log.WithContext(ctx).Errorf("SaveReply create reply fail, err:%v", err)
return err
}
// 评价表更新hasReply字段
if _, err := tx.ReviewInfo.
WithContext(ctx).
Where(tx.ReviewInfo.ReviewID.Eq(reply.ReviewID)).
Update(tx.ReviewInfo.HasReply, 1); err != nil {
r.log.WithContext(ctx).Errorf("SaveReply update review fail, err:%v", err)
return err
}
return nil
})

go语言微服务与云原生
http://example.com/2023/12/07/go语言微服务与云原生/
作者
Forrest
发布于
2023年12月7日
许可协议