
docker 配置本地 etcd 集群并使用 clientapiv3 管理集群
一、用 docker 搭建集群
etcd 没有在 docker hub 中创建 image,所以自然拉取不到。
本文意在模拟使用步骤,所以创建三个 go 环境的容器,在每个容器中配置 etcd。
1 创建 go 容器
hub 中有 golang 镜像,可以直接拉取:
docker pull golang
拉取到的镜像是基于 debian buster 制作。
创建三个容器:
docker run -itd --name etcd1 golang docker run -itd --name etcd2 golang docker run -itd --name etcd3 golang
2 在每个镜像中 clone etcd
docker exec -it etcd1 bash docker exec -it etcd2 bash docker exec -it etcd3 bash
在每个容器中克隆:
git clone https://github.com/etcd-io/etcd.git
3 编绎 etcd
在每个容器中都需要执行下面的命令。
cd etcd ./build.sh
编绎脚本会拉取一些 golang 库,所以先设置好 goproxy 是非常有必要的。
go env -w GO111MODULE=on go env -w GOPROXY=https://goproxy.cn,direct
编绎完成后,会多一个 bin 目录,里面有两个可执行文件etcd
和etcdctl
,分别为服务端和客户端文件,搭建集群,使用的是etcd
。
4 写配置文件
etcd 的目录中有一个etcd.conf.yml.sample
是示例配置文件,将其复制一份用来作待使用的配置文件:
cp etcd.conf.yml.sample etcd.conf.yml
修改配置文件,下面只列出配置文件中需要修改的部分:
# 节点别名,给人类看的 name: 'infra3' # 数据文件存放的目录 data-dir: /var/lib/etcd # 用逗号分隔的 url 列表,用于与其他节点通信 listen-peer-urls: http://172.17.0.4:2380 # 用逗号分隔的 url 列表,用于与客户端通信 listen-client-urls: http://172.17.0.4:2379,http://localhost:2379 # 用逗号分隔的 url 列表,用于通知其他节点,与通信端口相同即可 initial-advertise-peer-urls: http://172.17.0.4:2380 # 用逗号分隔的 url 列表,用于公开通知客户端 advertise-client-urls: http://172.17.0.4:2379 # 初始化集群配置。集群各节点别名与其 url 的键值对列表 initial-cluster: infra1=http://172.17.0.2:2380,infra2=http://172.17.0.3:2380,infra3=http://172.17.0.4:2380 # 初始化集群 token initial-cluster-token: 'etcd-cluster-1'
5 使用配置文件运行各节点组成集群
在各个容器中分别执行:
bin/etcd --config-file etcd.conf.yml
集群应该已经正常运行了。
二、使用 clientv3 通信
1 客户端
客户端结构体Client
需要实现多个接口:
type Client struct { Cluster // Cluster 接口 KV // KV 接口 Lease // Lease 接口 Watcher // Watcher 接口 Auth // Auth 接口 Maintenance // Maintenance 接口 conn *grpc.ClientConn cfg Config creds grpccredentials.TransportCredentials resolver *resolver.EtcdManualResolver mu *sync.RWMutex ctx context.Context cancel context.CancelFunc // Username is a user name for authentication. Username string // Password is a password for authentication. Password string authTokenBundle credentials.Bundle callOpts []grpc.CallOption lgMu *sync.RWMutex lg *zap.Logger }
这些接口在创建Client
时实现:
func newClient(cfg *Config) (*Client, error) { ... client.Cluster = NewCluster(client) client.KV = NewKV(client) client.Lease = NewLease(client) client.Watcher = NewWatcher(client) client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) ... }
2 写入
写入操作(put)包括创建和更新:key 不存在会创建,key 存在则更新。
Put
方法在 KV 接口中声明:
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
第一个参数是一个上下文,而Client
结构体内是有上下文属性的,在调用Put
方法时可以直接使用Client
的上下文属性,也可以定义一个新的上下文context.TODO()
、context.Background()
、context.WithCanle()
、context.WithTimeout()
等,根据实际需要定义上下文。
Client
的上下文属性会在client.Close()
时取消,而手动创建的上下文需要手动取消。
package main import ( "fmt" "time" clientv3 "go.etcd.io/etcd/client/v3" ) func main() { client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer client.Close() ctx := client.Ctx() // ctx, cancel := context.WithCancel(context.Background()) putResp, err := client.Put(ctx, "test", "test0") if err != nil { panic(err) } fmt.Println("response: ", putResp) fmt.Println("created or updated -> key: test, value: test0") // cancel() }
结果:
response: &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:22 raft_term:4 <nil> {} [] 0} created or updated -> key: test, value: test0
3 查询
与写入类似,调用Get
方法:
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
3.1 查询一个结果
func main() { ... ctx := client.Ctx() getResp, err := client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("get response:", getResp) }
结果:
get response: &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:2 raft_term:2 [key:"test" create_revision:2 mod_revision:2 version:1 value:"test0" ] false 1 {} [] 0}
3.2 查询包含前缀的多个结果
使用前缀查询时,就需要用到OpOption
参数了。
前缀查询的方法为:
func WithPrefix() OpOption { return func(op *Op) { if len(op.key) == 0 { op.key, op.end = []byte{0}, []byte{0} return } op.end = getPrefix(op.key) } }
向 etcd 中添加 test1 、 test2 、test3 后,使用前缀查询:
getResp, err := client.Get(ctx, "test", clientv3.WithPrefix()) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Printf("%s = %s\n", kv.Key, string(kv.Value)) }
结果:
test = test0 test1 = 1 test2 = 2 test3 = 3
3.3 查询指定范围的多个结果
范围查询的方法为:
func WithRange(endKey string) OpOption { return func(op *Op) { op.end = []byte(endKey) } }
下面查询 test 到 test2 的结果:
getResp, err := client.Get(ctx, "test", clientv3.WithRange("test3")) // 不包含 test3 if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Printf("%s = %s\n", kv.Key, string(kv.Value)) }
结果:
test = test0 test1 = 1 test2 = 2
3.4 查询历史版本的值
这里就需要用到 put 或 get 响应中的Revision
属性了,代表每次修改的版本号,查询时指定想要查询的版本号即可查询对应历史版本的值。
版本号是全局版本号,不是某个值的版本号,任何一个值被创建、修改或删除,版本号都会增加 1 。
在 3.1 的响应中,我们能够看到每条查询结果中都有create_revision
和mod_revision
两个属性,分别对应的创建时的版本号和最后一次修改时的版本号,根据这两个版本号可以进行历史版本的查询:
getResp, err := client.Get(ctx, "test", clientv3.WithPrefix(), clientv3.WithRev(11)) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Println(kv) }
需要注意的是,在响应中有一个属性revision
,代表当前最新版本号,要查询的版本号不能大于此值。
本例中,revision=12
,所以我们查询 11 号的值,结果为:
key:"test" create_revision:2 mod_revision:11 version:2 value:"00" key:"test1" create_revision:3 mod_revision:10 version:2 value:"10" key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" key:"test3" create_revision:5 mod_revision:8 version:4 value:"30"
可以看到,查询到的结果最新版本号小于要查询的版本号时,返回的是最新版本的值。
3.5 查询大于等于某个键的值的所有结果
比如查询 key 大于等于 test2 的值:
getResp, err := client.Get(ctx, "test2", clientv3.WithFromKey()) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Println(kv) }
结果:
key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" key:"test3" create_revision:5 mod_revision:8 version:4 value:"30"
看一下WithFromKey()
方法的具体实现:
func WithFromKey() OpOption { return func(op *Op) { if len(op.key) == 0 { op.key = []byte{0} } op.end = []byte("\x00") } }
阅读Get
源码中可以知道,在调用Get
时,会初始化Op
结构体:
ret := Op{t: tRange, key: []byte(key)}
将传入的key
作为起始 key,WithFromKey()
将空字符串作为结束 key(意为无限大)。
由此可以查询到所有大于起始二进制 key 的 转为二进制后的 key。
3.6 其他 options
除上述 options 外,还有以下 options 可用:
WithSerializable
:让
Get
请求可序列化,能够降低服务端响应延迟
WithSort
:排序,需要与
WithPrefix
或
WithRange
合用,可以根据 key、version、revisions 或 value 进行排序
WithLimit
:限制查询结果的数量
WithLease
:为
Put
请求添加租约 id
WithKeysOnly
:只查询符合条件的所有 key,不查询对应的 value ...
源码中还有很多 options,不一一列举了,可以自行阅读源码:https://github.com/etcd-io/et...
4 租约
租约的意义就是节点的只能存活租约设置的时间。租约的存活时间 TTL 如果到期,租约就会过期,并且所有附带的节点都会被删除。
租约不影响集群版本。
4.1 批准租约
grantResp, err := client.Grant(ctx, 10) if err != nil { panic(err) } fmt.Println("新租约 ID:", grantResp.ID)
结果:
新租约 ID: 4635744352028668180
4.2 废除租约
可以使用租约 id 废除租约,废除时其附带的节点也会被删除。
revokeResp, err := client.Revoke(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Println("租约已废除:", revokeResp)
结果:
租约已废除: &{cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 {} [] 0}
4.3 获取租约剩余时间
租约过期时,剩余时间会是 -1。
liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) time.Sleep(6 * time.Second) liveResp, err = client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL)
结果:
新租约 ID: 2426447259829618775 剩余 4 秒 剩余 -1 秒
4.4 获取有效的所有租约
leasesResp, err := client.Leases(ctx) if err != nil { panic(err) } fmt.Println(leasesResp.Leases)
返回的Leases
结构体:
type LeaseStatus struct { ID LeaseID `json:"id"` // TODO: TTL int64 }
结果:
新租约 ID: 4635744352028668184 [{4635744352028668184}]
4.5 自动续约
自动续约KeepAlive
,会返回一个LeaseKeepAliveResponse
的只读管道,续约成功时后向这个管道内发送响应,我们只需要处理接收到的响应即可:
keepResp, err := client.KeepAlive(ctx, grantResp.ID) if err != nil { panic(err) } for resp := range keepResp { fmt.Println("续约成功:", resp) liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) }
结果:
续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 剩余 9 秒 续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 剩余 9 秒 续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 ...
4.6 续约一次
time.Sleep(6 * time.Second) liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) keepResp, err := client.KeepAliveOnce(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Println("续约成功:", keepResp) liveResp, err = client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL)
结果:
剩余 3 秒 续约成功: cluster_id:4577776708217222017 member_id:10491173242589593685 revision:43 raft_term:2 剩余 9 秒
4.7 废除当前客户端批准的所有租约
释放所有租约的方法为:
client.Lease.Close()
但我测试时发现调用这个方法后,没有一个租约被释放,所以此条待完善,未能发现是我使用的问题,还是代码的问题。
5 删除
可以删除一个或符合条件的 key。
5.1 删除一个
delResp, err := client.Delete(ctx, "test") if err != nil { panic(err) } fmt.Println(delResp)
没有报错即为删除成功,但此处需要注意,删除一个不存在的键并不会报错,只是版本号不会发生变化,响应中的Deleted
属性值为 0。
5.2 删除多个
delResp, err := client.Delete(ctx, "test", clientv3.WithPrefix()) if err != nil { panic(err) } fmt.Printf("删除了 %d 个值\n", delResp.Deleted)
options 与Get
中类似。
结果:
删除了 4 个值
6 监听
etcd 的监听是Watch
方法。
6.1 监听一个节点
以监听test
节点为例:
wch := client.Watch(ctx, "test") for e := range wch { fmt.Println(e.Events[0]) }
监听后,会返回一个装有WatchResponse
的管道,我们只需要遍历这个管道,即可监听test
节点的所有变动。
WatchResponse
结构体中存储节点变动的是一个事件的切片,监听一个节点时,里面不会有多个事件。
运行程序后,用其他程序修改test
节点的值,监听程序会得到如下输出:
&{PUT key:"test" create_revision:24 mod_revision:27 version:4 value:"510" <nil> {} [] 0} &{PUT key:"test" create_revision:24 mod_revision:28 version:5 value:"10" <nil> {} [] 0} &{DELETE key:"test" mod_revision:29 <nil> {} [] 0}
6.2 监听多个节点
监听也可以使用 option,参考第 3 节中关于 option 的描述。
wch := client.Watch(ctx, "test", clientv3.WithPrefix()) for resp := range wch { for _, e := range resp.Events { // 监听多个节点,可能会有多个事件,所以这里对事件切片进行遍历 fmt.Println(e) } }
结果:
&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110" <nil> {} [] 0} &{PUT key:"test1" create_revision:34 mod_revision:39 version:2 value:"111" <nil> {} [] 0} &{DELETE key:"test" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test1" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test2" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test3" mod_revision:40 <nil> {} [] 0}
6.3 从节点的指定历史版本开始监听
默认的监听是监听当前版本之后的变化,但一些场景下需要从历史版本开始监听。
比如,一个程序一直在监听 etcd 的某一个节点,运行过程中,程序发生异常退出,程序退出后到重新启动前是无法获取 etcd 的节点的所有改动的。如果就这样启动程序,程序获取到的是最新的版本,与程序退出前需要的参数并不一致,必然会引起其他问题,导致程序不能像预期一样运行。所以,程序应该在启动时,从退出前获得的最后一个版本号开始监听。
示例:
程序退出前,test
节点的版本号是 38,但随着其他程序的调用更新,当前集群的版本号已经被更新到了 40,程序重新启动时,就需要从 38 号开始监听:
wch := client.Watch(ctx, "test", clientv3.WithRev(38)) for resp := range wch { for _, e := range resp.Events { fmt.Println(e) } }
历史监听启动时会立即获取到从 38 到 40,test
节点的所有改动:
&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110" <nil> {} [] 0} &{DELETE key:"test" mod_revision:40 <nil> {} [] 0}
7 压缩版本号
etcd 默认情况下会保存历史版本号以便程序可以读取历史版本,但如果不加以控制,版本号会无限叠加,历史数据也会无限保存,这样的话就会产生大量的无用的历史数据。
为了避免这个问题,etcd 提供了压缩功能:删除指定版本之前的历史版本和历史数据,被删除的所有数据将无法访问。
比如,当前版本号是 43,我们想删除 42 之前的数据:
comResp, err := client.Compact(ctx, 42) if err != nil { panic(err) } fmt.Println(comResp) // 获取 42 之前的版本 getResp, err := client.Get(ctx, "test", clientv3.WithRev(40)) if err != nil { panic(err) } fmt.Println(getResp)
结果:
&{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:42 raft_term:2 {} [] 0} {"level":"warn","ts":"2021-04-20T18:50:53.431+0800","caller":"[email protected]/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc000140380/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = OutOfRange desc = etcdserver: mvcc: required revision has been compacted"} panic: etcdserver: mvcc: required revision has been compacted
可以看到,40 版本号已经被压缩,无法再访问。
8 事务
etcd 中事务是原子性过程,只支持If().Then().Else().Commit()
这种表达。
If
中支持传入多个比较条件Cmp
,如果条件都满足,执行Then
中的Op
,否则执行Else
中的Op
,最后Commit
。
四舍五入示例:
如果test
的值小于 5,将test
的值改为 0,否则改为 10。
txn := client.Txn(ctx) getResp, err := client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("事务前的值:", string(getResp.Kvs[0].Value)) txnResp, err := txn.If(clientv3.Compare(clientv3.Value("test"), "<", "5")). Then(clientv3.OpPut("test", "0")). Else(clientv3.OpPut("test", "9")). Commit() if err != nil { panic(err) } if txnResp.Succeeded { fmt.Println("小于 5,舍") } else { fmt.Println("大于 5,入") } getResp, err = client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("事务提交成功后的值:", string(getResp.Kvs[0].Value))
结果:
事务前的值: 5 大于 5,入 事务提交成功后的值: 9
9 用户和角色 / Auth 身份认证
etcd 中的用户可以授予角色权限。root 用户
auth 的完整示例(选自官方 example 文件):
unc main() { endpoints := []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"} client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer client.Close() // 添加角色 root _, err = client.RoleAdd(context.TODO(), "root") if err != nil { panic(err) } fmt.Println("已添加角色:root") // 添加用户 _, err = client.UserAdd(context.TODO(), "root", "123") if err != nil { panic(err) } fmt.Println("已创建用户,用户名:root,密码:123") // 为用户授 root 授予 root 角色权限 _, err = client.UserGrantRole(context.TODO(), "root", "root") if err != nil { panic(err) } fmt.Println("已为用户 root 授予 root 角色权限") // 添加角色 r _, err = client.RoleAdd(context.TODO(), "r") if err != nil { panic(err) } fmt.Println("已添加角色:r") // 为角色 r 设置权限,对 [test, test3) 有写权限 _, err = client.RoleGrantPermission( context.TODO(), "r", "test", "test3", clientv3.PermissionType(clientv3.PermWrite), ) if err != nil { panic(err) } fmt.Println("角色 r 对节点 test 到 test3 添加写权限") // 添加用户 u _, err = client.UserAdd(context.TODO(), "u", "123") if err != nil { panic(err) } fmt.Println("已创建用户,用户名:u,密码:123") // 为用户 u 授予 角色 r 的权限 _, err = client.UserGrantRole(context.TODO(), "u", "r") if err != nil { panic(err) } fmt.Println("已为用户 u 授予角色 r 的权限") // 开启集权的身份认证 _, err = client.AuthEnable(context.TODO()) if err != nil { panic(err) } fmt.Println("已为集群开启权限认证") // 以指定用户创建客户端 authClient, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, Username: "u", Password: "123", }) if err != nil { panic(err) } defer authClient.Close() // 添加或修改节点 test1 _, err = authClient.Put(context.TODO(), "test1", "1") if err != nil { panic(err) } // 开启事务 _, err = authClient.Txn(context.TODO()). If(clientv3.Compare(clientv3.Value("test4"), ">", "1")). Then(clientv3.OpPut("test4", "yes")). Else(clientv3.OpPut("test4", "no")). Commit() fmt.Println(err) // 以 root 用户创建客户端 rootClient, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, Username: "root", Password: "123", }) if err != nil { panic(err) } defer rootClient.Close() // 获取角色 r 的信息 resp, err := rootClient.RoleGet(context.TODO(), "r") if err != nil { panic(err) } fmt.Printf("用户 u 的权限:起始 key %q,结束 key %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd) // 关闭集群的身份认证 _, err = rootClient.AuthDisable(context.TODO()) if err != nil { panic(err) } }
结果:
已添加角色:root 已创建用户,用户名:root,密码:123 已为用户 root 授予 root 角色权限 已添加角色:r 角色 r 对节点 test 到 test3 添加写权限 已创建用户,用户名:u,密码:123 已为用户 u 授予角色 r 的权限 已为集群开启权限认证 {"level":"warn","ts":"2021-04-21T09:01:38.789+0800","caller":"[email protected]/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc0003ba1c0/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = PermissionDenied desc = etcdserver: permission denied"} etcdserver: permission denied 用户 u 的权限:起始 key "test",结束 key "test3"
thepoy
0 条评论
thepoy
宣传栏
目录
一、用 docker 搭建集群
etcd 没有在 docker hub 中创建 image,所以自然拉取不到。
本文意在模拟使用步骤,所以创建三个 go 环境的容器,在每个容器中配置 etcd。
1 创建 go 容器
hub 中有 golang 镜像,可以直接拉取:
docker pull golang
拉取到的镜像是基于 debian buster 制作。
创建三个容器:
docker run -itd --name etcd1 golang docker run -itd --name etcd2 golang docker run -itd --name etcd3 golang
2 在每个镜像中 clone etcd
docker exec -it etcd1 bash docker exec -it etcd2 bash docker exec -it etcd3 bash
在每个容器中克隆:
git clone https://github.com/etcd-io/etcd.git
3 编绎 etcd
在每个容器中都需要执行下面的命令。
cd etcd ./build.sh
编绎脚本会拉取一些 golang 库,所以先设置好 goproxy 是非常有必要的。
go env -w GO111MODULE=on go env -w GOPROXY=https://goproxy.cn,direct
编绎完成后,会多一个 bin 目录,里面有两个可执行文件etcd
和etcdctl
,分别为服务端和客户端文件,搭建集群,使用的是etcd
。
4 写配置文件
etcd 的目录中有一个etcd.conf.yml.sample
是示例配置文件,将其复制一份用来作待使用的配置文件:
cp etcd.conf.yml.sample etcd.conf.yml
修改配置文件,下面只列出配置文件中需要修改的部分:
# 节点别名,给人类看的 name: 'infra3' # 数据文件存放的目录 data-dir: /var/lib/etcd # 用逗号分隔的 url 列表,用于与其他节点通信 listen-peer-urls: http://172.17.0.4:2380 # 用逗号分隔的 url 列表,用于与客户端通信 listen-client-urls: http://172.17.0.4:2379,http://localhost:2379 # 用逗号分隔的 url 列表,用于通知其他节点,与通信端口相同即可 initial-advertise-peer-urls: http://172.17.0.4:2380 # 用逗号分隔的 url 列表,用于公开通知客户端 advertise-client-urls: http://172.17.0.4:2379 # 初始化集群配置。集群各节点别名与其 url 的键值对列表 initial-cluster: infra1=http://172.17.0.2:2380,infra2=http://172.17.0.3:2380,infra3=http://172.17.0.4:2380 # 初始化集群 token initial-cluster-token: 'etcd-cluster-1'
5 使用配置文件运行各节点组成集群
在各个容器中分别执行:
bin/etcd --config-file etcd.conf.yml
集群应该已经正常运行了。
二、使用 clientv3 通信
1 客户端
客户端结构体Client
需要实现多个接口:
type Client struct { Cluster // Cluster 接口 KV // KV 接口 Lease // Lease 接口 Watcher // Watcher 接口 Auth // Auth 接口 Maintenance // Maintenance 接口 conn *grpc.ClientConn cfg Config creds grpccredentials.TransportCredentials resolver *resolver.EtcdManualResolver mu *sync.RWMutex ctx context.Context cancel context.CancelFunc // Username is a user name for authentication. Username string // Password is a password for authentication. Password string authTokenBundle credentials.Bundle callOpts []grpc.CallOption lgMu *sync.RWMutex lg *zap.Logger }
这些接口在创建Client
时实现:
func newClient(cfg *Config) (*Client, error) { ... client.Cluster = NewCluster(client) client.KV = NewKV(client) client.Lease = NewLease(client) client.Watcher = NewWatcher(client) client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) ... }
2 写入
写入操作(put)包括创建和更新:key 不存在会创建,key 存在则更新。
Put
方法在 KV 接口中声明:
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
第一个参数是一个上下文,而Client
结构体内是有上下文属性的,在调用Put
方法时可以直接使用Client
的上下文属性,也可以定义一个新的上下文context.TODO()
、context.Background()
、context.WithCanle()
、context.WithTimeout()
等,根据实际需要定义上下文。
Client
的上下文属性会在client.Close()
时取消,而手动创建的上下文需要手动取消。
package main import ( "fmt" "time" clientv3 "go.etcd.io/etcd/client/v3" ) func main() { client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer client.Close() ctx := client.Ctx() // ctx, cancel := context.WithCancel(context.Background()) putResp, err := client.Put(ctx, "test", "test0") if err != nil { panic(err) } fmt.Println("response: ", putResp) fmt.Println("created or updated -> key: test, value: test0") // cancel() }
结果:
response: &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:22 raft_term:4 <nil> {} [] 0} created or updated -> key: test, value: test0
3 查询
与写入类似,调用Get
方法:
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
3.1 查询一个结果
func main() { ... ctx := client.Ctx() getResp, err := client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("get response:", getResp) }
结果:
get response: &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:2 raft_term:2 [key:"test" create_revision:2 mod_revision:2 version:1 value:"test0" ] false 1 {} [] 0}
3.2 查询包含前缀的多个结果
使用前缀查询时,就需要用到OpOption
参数了。
前缀查询的方法为:
func WithPrefix() OpOption { return func(op *Op) { if len(op.key) == 0 { op.key, op.end = []byte{0}, []byte{0} return } op.end = getPrefix(op.key) } }
向 etcd 中添加 test1 、 test2 、test3 后,使用前缀查询:
getResp, err := client.Get(ctx, "test", clientv3.WithPrefix()) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Printf("%s = %s\n", kv.Key, string(kv.Value)) }
结果:
test = test0 test1 = 1 test2 = 2 test3 = 3
3.3 查询指定范围的多个结果
范围查询的方法为:
func WithRange(endKey string) OpOption { return func(op *Op) { op.end = []byte(endKey) } }
下面查询 test 到 test2 的结果:
getResp, err := client.Get(ctx, "test", clientv3.WithRange("test3")) // 不包含 test3 if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Printf("%s = %s\n", kv.Key, string(kv.Value)) }
结果:
test = test0 test1 = 1 test2 = 2
3.4 查询历史版本的值
这里就需要用到 put 或 get 响应中的Revision
属性了,代表每次修改的版本号,查询时指定想要查询的版本号即可查询对应历史版本的值。
版本号是全局版本号,不是某个值的版本号,任何一个值被创建、修改或删除,版本号都会增加 1 。
在 3.1 的响应中,我们能够看到每条查询结果中都有create_revision
和mod_revision
两个属性,分别对应的创建时的版本号和最后一次修改时的版本号,根据这两个版本号可以进行历史版本的查询:
getResp, err := client.Get(ctx, "test", clientv3.WithPrefix(), clientv3.WithRev(11)) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Println(kv) }
需要注意的是,在响应中有一个属性revision
,代表当前最新版本号,要查询的版本号不能大于此值。
本例中,revision=12
,所以我们查询 11 号的值,结果为:
key:"test" create_revision:2 mod_revision:11 version:2 value:"00" key:"test1" create_revision:3 mod_revision:10 version:2 value:"10" key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" key:"test3" create_revision:5 mod_revision:8 version:4 value:"30"
可以看到,查询到的结果最新版本号小于要查询的版本号时,返回的是最新版本的值。
3.5 查询大于等于某个键的值的所有结果
比如查询 key 大于等于 test2 的值:
getResp, err := client.Get(ctx, "test2", clientv3.WithFromKey()) if err != nil { panic(err) } for _, kv := range getResp.Kvs { fmt.Println(kv) }
结果:
key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" key:"test3" create_revision:5 mod_revision:8 version:4 value:"30"
看一下WithFromKey()
方法的具体实现:
func WithFromKey() OpOption { return func(op *Op) { if len(op.key) == 0 { op.key = []byte{0} } op.end = []byte("\x00") } }
阅读Get
源码中可以知道,在调用Get
时,会初始化Op
结构体:
ret := Op{t: tRange, key: []byte(key)}
将传入的key
作为起始 key,WithFromKey()
将空字符串作为结束 key(意为无限大)。
由此可以查询到所有大于起始二进制 key 的 转为二进制后的 key。
3.6 其他 options
除上述 options 外,还有以下 options 可用:
WithSerializable
:让
Get
请求可序列化,能够降低服务端响应延迟
WithSort
:排序,需要与
WithPrefix
或
WithRange
合用,可以根据 key、version、revisions 或 value 进行排序
WithLimit
:限制查询结果的数量
WithLease
:为
Put
请求添加租约 id
WithKeysOnly
:只查询符合条件的所有 key,不查询对应的 value ...
源码中还有很多 options,不一一列举了,可以自行阅读源码:https://github.com/etcd-io/et...
4 租约
租约的意义就是节点的只能存活租约设置的时间。租约的存活时间 TTL 如果到期,租约就会过期,并且所有附带的节点都会被删除。
租约不影响集群版本。
4.1 批准租约
grantResp, err := client.Grant(ctx, 10) if err != nil { panic(err) } fmt.Println("新租约 ID:", grantResp.ID)
结果:
新租约 ID: 4635744352028668180
4.2 废除租约
可以使用租约 id 废除租约,废除时其附带的节点也会被删除。
revokeResp, err := client.Revoke(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Println("租约已废除:", revokeResp)
结果:
租约已废除: &{cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 {} [] 0}
4.3 获取租约剩余时间
租约过期时,剩余时间会是 -1。
liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) time.Sleep(6 * time.Second) liveResp, err = client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL)
结果:
新租约 ID: 2426447259829618775 剩余 4 秒 剩余 -1 秒
4.4 获取有效的所有租约
leasesResp, err := client.Leases(ctx) if err != nil { panic(err) } fmt.Println(leasesResp.Leases)
返回的Leases
结构体:
type LeaseStatus struct { ID LeaseID `json:"id"` // TODO: TTL int64 }
结果:
新租约 ID: 4635744352028668184 [{4635744352028668184}]
4.5 自动续约
自动续约KeepAlive
,会返回一个LeaseKeepAliveResponse
的只读管道,续约成功时后向这个管道内发送响应,我们只需要处理接收到的响应即可:
keepResp, err := client.KeepAlive(ctx, grantResp.ID) if err != nil { panic(err) } for resp := range keepResp { fmt.Println("续约成功:", resp) liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) }
结果:
续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 剩余 9 秒 续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 剩余 9 秒 续约成功: cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 ...
4.6 续约一次
time.Sleep(6 * time.Second) liveResp, err := client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL) keepResp, err := client.KeepAliveOnce(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Println("续约成功:", keepResp) liveResp, err = client.TimeToLive(ctx, grantResp.ID) if err != nil { panic(err) } fmt.Printf("剩余 %d 秒\n", liveResp.TTL)
结果:
剩余 3 秒 续约成功: cluster_id:4577776708217222017 member_id:10491173242589593685 revision:43 raft_term:2 剩余 9 秒
4.7 废除当前客户端批准的所有租约
释放所有租约的方法为:
client.Lease.Close()
但我测试时发现调用这个方法后,没有一个租约被释放,所以此条待完善,未能发现是我使用的问题,还是代码的问题。
5 删除
可以删除一个或符合条件的 key。
5.1 删除一个
delResp, err := client.Delete(ctx, "test") if err != nil { panic(err) } fmt.Println(delResp)
没有报错即为删除成功,但此处需要注意,删除一个不存在的键并不会报错,只是版本号不会发生变化,响应中的Deleted
属性值为 0。
5.2 删除多个
delResp, err := client.Delete(ctx, "test", clientv3.WithPrefix()) if err != nil { panic(err) } fmt.Printf("删除了 %d 个值\n", delResp.Deleted)
options 与Get
中类似。
结果:
删除了 4 个值
6 监听
etcd 的监听是Watch
方法。
6.1 监听一个节点
以监听test
节点为例:
wch := client.Watch(ctx, "test") for e := range wch { fmt.Println(e.Events[0]) }
监听后,会返回一个装有WatchResponse
的管道,我们只需要遍历这个管道,即可监听test
节点的所有变动。
WatchResponse
结构体中存储节点变动的是一个事件的切片,监听一个节点时,里面不会有多个事件。
运行程序后,用其他程序修改test
节点的值,监听程序会得到如下输出:
&{PUT key:"test" create_revision:24 mod_revision:27 version:4 value:"510" <nil> {} [] 0} &{PUT key:"test" create_revision:24 mod_revision:28 version:5 value:"10" <nil> {} [] 0} &{DELETE key:"test" mod_revision:29 <nil> {} [] 0}
6.2 监听多个节点
监听也可以使用 option,参考第 3 节中关于 option 的描述。
wch := client.Watch(ctx, "test", clientv3.WithPrefix()) for resp := range wch { for _, e := range resp.Events { // 监听多个节点,可能会有多个事件,所以这里对事件切片进行遍历 fmt.Println(e) } }
结果:
&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110" <nil> {} [] 0} &{PUT key:"test1" create_revision:34 mod_revision:39 version:2 value:"111" <nil> {} [] 0} &{DELETE key:"test" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test1" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test2" mod_revision:40 <nil> {} [] 0} &{DELETE key:"test3" mod_revision:40 <nil> {} [] 0}
6.3 从节点的指定历史版本开始监听
默认的监听是监听当前版本之后的变化,但一些场景下需要从历史版本开始监听。
比如,一个程序一直在监听 etcd 的某一个节点,运行过程中,程序发生异常退出,程序退出后到重新启动前是无法获取 etcd 的节点的所有改动的。如果就这样启动程序,程序获取到的是最新的版本,与程序退出前需要的参数并不一致,必然会引起其他问题,导致程序不能像预期一样运行。所以,程序应该在启动时,从退出前获得的最后一个版本号开始监听。
示例:
程序退出前,test
节点的版本号是 38,但随着其他程序的调用更新,当前集群的版本号已经被更新到了 40,程序重新启动时,就需要从 38 号开始监听:
wch := client.Watch(ctx, "test", clientv3.WithRev(38)) for resp := range wch { for _, e := range resp.Events { fmt.Println(e) } }
历史监听启动时会立即获取到从 38 到 40,test
节点的所有改动:
&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110" <nil> {} [] 0} &{DELETE key:"test" mod_revision:40 <nil> {} [] 0}
7 压缩版本号
etcd 默认情况下会保存历史版本号以便程序可以读取历史版本,但如果不加以控制,版本号会无限叠加,历史数据也会无限保存,这样的话就会产生大量的无用的历史数据。
为了避免这个问题,etcd 提供了压缩功能:删除指定版本之前的历史版本和历史数据,被删除的所有数据将无法访问。
比如,当前版本号是 43,我们想删除 42 之前的数据:
comResp, err := client.Compact(ctx, 42) if err != nil { panic(err) } fmt.Println(comResp) // 获取 42 之前的版本 getResp, err := client.Get(ctx, "test", clientv3.WithRev(40)) if err != nil { panic(err) } fmt.Println(getResp)
结果:
&{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:42 raft_term:2 {} [] 0} {"level":"warn","ts":"2021-04-20T18:50:53.431+0800","caller":"[email protected]/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc000140380/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = OutOfRange desc = etcdserver: mvcc: required revision has been compacted"} panic: etcdserver: mvcc: required revision has been compacted
可以看到,40 版本号已经被压缩,无法再访问。
8 事务
etcd 中事务是原子性过程,只支持If().Then().Else().Commit()
这种表达。
If
中支持传入多个比较条件Cmp
,如果条件都满足,执行Then
中的Op
,否则执行Else
中的Op
,最后Commit
。
四舍五入示例:
如果test
的值小于 5,将test
的值改为 0,否则改为 10。
txn := client.Txn(ctx) getResp, err := client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("事务前的值:", string(getResp.Kvs[0].Value)) txnResp, err := txn.If(clientv3.Compare(clientv3.Value("test"), "<", "5")). Then(clientv3.OpPut("test", "0")). Else(clientv3.OpPut("test", "9")). Commit() if err != nil { panic(err) } if txnResp.Succeeded { fmt.Println("小于 5,舍") } else { fmt.Println("大于 5,入") } getResp, err = client.Get(ctx, "test") if err != nil { panic(err) } fmt.Println("事务提交成功后的值:", string(getResp.Kvs[0].Value))
结果:
事务前的值: 5 大于 5,入 事务提交成功后的值: 9
9 用户和角色 / Auth 身份认证
etcd 中的用户可以授予角色权限。root 用户
auth 的完整示例(选自官方 example 文件):
unc main() { endpoints := []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"} client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } defer client.Close() // 添加角色 root _, err = client.RoleAdd(context.TODO(), "root") if err != nil { panic(err) } fmt.Println("已添加角色:root") // 添加用户 _, err = client.UserAdd(context.TODO(), "root", "123") if err != nil { panic(err) } fmt.Println("已创建用户,用户名:root,密码:123") // 为用户授 root 授予 root 角色权限 _, err = client.UserGrantRole(context.TODO(), "root", "root") if err != nil { panic(err) } fmt.Println("已为用户 root 授予 root 角色权限") // 添加角色 r _, err = client.RoleAdd(context.TODO(), "r") if err != nil { panic(err) } fmt.Println("已添加角色:r") // 为角色 r 设置权限,对 [test, test3) 有写权限 _, err = client.RoleGrantPermission( context.TODO(), "r", "test", "test3", clientv3.PermissionType(clientv3.PermWrite), ) if err != nil { panic(err) } fmt.Println("角色 r 对节点 test 到 test3 添加写权限") // 添加用户 u _, err = client.UserAdd(context.TODO(), "u", "123") if err != nil { panic(err) } fmt.Println("已创建用户,用户名:u,密码:123") // 为用户 u 授予 角色 r 的权限 _, err = client.UserGrantRole(context.TODO(), "u", "r") if err != nil { panic(err) } fmt.Println("已为用户 u 授予角色 r 的权限") // 开启集权的身份认证 _, err = client.AuthEnable(context.TODO()) if err != nil { panic(err) } fmt.Println("已为集群开启权限认证") // 以指定用户创建客户端 authClient, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, Username: "u", Password: "123", }) if err != nil { panic(err) } defer authClient.Close() // 添加或修改节点 test1 _, err = authClient.Put(context.TODO(), "test1", "1") if err != nil { panic(err) } // 开启事务 _, err = authClient.Txn(context.TODO()). If(clientv3.Compare(clientv3.Value("test4"), ">", "1")). Then(clientv3.OpPut("test4", "yes")). Else(clientv3.OpPut("test4", "no")). Commit() fmt.Println(err) // 以 root 用户创建客户端 rootClient, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, Username: "root", Password: "123", }) if err != nil { panic(err) } defer rootClient.Close() // 获取角色 r 的信息 resp, err := rootClient.RoleGet(context.TODO(), "r") if err != nil { panic(err) } fmt.Printf("用户 u 的权限:起始 key %q,结束 key %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd) // 关闭集群的身份认证 _, err = rootClient.AuthDisable(context.TODO()) if err != nil { panic(err) } }
结果:
已添加角色:root 已创建用户,用户名:root,密码:123 已为用户 root 授予 root 角色权限 已添加角色:r 角色 r 对节点 test 到 test3 添加写权限 已创建用户,用户名:u,密码:123 已为用户 u 授予角色 r 的权限 已为集群开启权限认证 {"level":"warn","ts":"2021-04-21T09:01:38.789+0800","caller":"[email protected]/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc0003ba1c0/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = PermissionDenied desc = etcdserver: permission denied"} etcdserver: permission denied 用户 u 的权限:起始 key "test",结束 key "test3"