Go语言操作etcd,这里推荐官方包etcd/clientv3。 文档:
https://pkg.go.dev/go.etcd.io/etcd/clientv3 etcd v3使用gRPC进行远程过程调用。和clientv3使用grpc-go连接到etcd。确保在使用客户端后关闭该客户端。如果客户端未关闭,连接将有泄漏的程序。若要指定客户端请求超时,请传递上下文,设置context.WithTimeout。
安装
go get go.etcd.io/etcd/client/v3
put、get操作
示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_,err = cli.Put(ctx, "testkey", "testvalues")
cancel()
if err != nil {
// handle error!
fmt.Printf("put操作失败%v\n",err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "testkey")
cancel()
if err != nil {
// handle error!
fmt.Printf("get操作失败%v\n",err)
}
// use the response
fmt.Println(resp)
for _, kv := range resp.Kvs {
fmt.Printf("%s,%s",kv.Key,kv.Value)
}
}
watch 操作
watch监控某个key的变化 示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
rch := cli.Watch(context.Background(), "testkey")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
可在修改testkey值,观察其变化
删除
示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Delete(ctx, "testkey")
cancel()
if err != nil {
// handle error!
fmt.Printf("delete操作失败%v\n",err)
}
fmt.Println(resp.Deleted) //是否被删除
}
lease租约
示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
//设置租约为5秒
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
fmt.Printf("设置lease失败%v\n",err)
}
// 5秒后 ‘foo’ 将被移除
r, err := cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("lease失败%v\n",err)
}
fmt.Println(r.OpResponse())
}
keep-alive查看租约是否存活
示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
//设置租约为5秒
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
fmt.Printf("设置lease失败%v\n",err)
}
// 5秒后 ‘foo’ 将被移除
r, err := cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("lease失败%v\n",err)
}
fmt.Println(r.OpResponse())
// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ka := <-ch
fmt.Println("ttl:", ka.TTL)
}
基于etcd实现分布式锁
示例
package main
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("连接etcd失败%v\n",err)
}
defer cli.Close()
//为演示竞争锁,创建两个会话
s1, err := concurrency.NewSession(cli)
if err != nil {
fmt.Println(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
fmt.Println(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// s1获取锁
if err := m1.Lock(context.TODO()); err != nil {
fmt.Println(err)
}
fmt.Println("s1获取锁")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// 等待s1释放锁 /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
fmt.Println(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
fmt.Println(err)
}
fmt.Println("s1释放锁")
<-m2Locked
fmt.Println("s2获取锁")
}
etcd分布式锁和redis实现分布式锁的对比:
redis
优点:基于主从同步和哨兵实现,具有高可用 基于内存,读写速度快高性能
缺点:
- 存在死锁问题,业务逻辑出错或超时等原因,没有及时释放锁,出现死锁。
- 锁过期时间设置不合理,导致大量锁竞争。
etcd
- 原子性 etcd是基于raft实现的,具有强一致性的kv存储。
- 高性能 每个节点都可以处理大量的读写请求。
- 可扩展性 可以增加节点扩容。
- 高可用性 节点冗余,单节点出故障,不影响整个集群。
缺点:
- 集群的配置相对复杂
本文暂时没有评论,来添加一个吧(●'◡'●)