etcd是什么?
etcd是一个用Go语言写的,用于分布式系统下高性能的键值(K-V)存储、服务发现、负载均衡、、分布式锁、配置管理等场景地应用,类似于Java的zookeeper。基于Raft协议,能保证数据的一致性。
官方地址
[etcd.io] https://etcd.io
[github.com]https://github.com/etcd-io/etcd
etcd的安装
有两种方式安装,可以通过编译好的二进制或源码安装,我这里用的是二进制安装的,另一个源码安装的我没有试过。
另外,这里的安装也不是集群方式安装的,用的单结点的,只是一些基本的操作。
一、二进制安装
[https://etcd.io/docs/v3.4.0/integrations/] https://etcd.io/docs/v3.4.0/integrations/
$ ls ~/Applications/etcd Documentation README-etcdctl.md README.md READMEv2-etcdctl.md etcd etcdctl
二、源码安装
因为是Go写的,可直接使用 go get https://github.com/etcd-io/etcd
下载就行了,不过这个etcd包有点大,如果网络不好,还不一定能下载下来,凡正我是没整下来。
所以,如果下载不下来,可以尝试用github的zip打包下载试试,如果也还是不行,那么可用国内的下载加速。
我在网上找到一个 [https://www.newbe.pro/Mirrors/Mirrors-Etcd/] https://www.newbe.pro/Mirrors/Mirrors-Etcd/,从这里面直接把压缩包下载下来,放到你的Gopath下就行了。
接下来进入到etcd的目录下面,执行 ./build
构建就行了。
etcd启动
不管用那种方式,要启动etcd,先找到etcd这个文件,然后执行它.默认在你本机的2379端口,如果你要在公网开启,指定下参数就行了.
$ ~/Applications/etcd/etcd --advertise-client-urls="https://123.33.44.34:2379" --listen-client-urls="https://123.33.44.34:2379"
本地如下方式启动
$ ~/Applications/etcd/etcd [WARNING] Deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead 2020-03-11 12:55:38.289362 I | etcdmain: etcd Version: 3.4.4 2020-03-11 12:55:38.289461 I | etcdmain: Git SHA: c65a9e2dd 2020-03-11 12:55:38.289468 I | etcdmain: Go Version: go1.12.12 2020-03-11 12:55:38.289476 I | etcdmain: Go OS/Arch: darwin/amd64 2020-03-11 12:55:38.289482 I | etcdmain: setting maximum number of CPUs to 4, total number of available CPUs is 4 2020-03-11 12:55:38.289492 N | etcdmain: failed to detect default host (default host not supported on darwin_amd64) 2020-03-11 12:55:38.289500 W | etcdmain: no data-dir provided, using default data-dir ./default.etcd [WARNING] Deprecated '--logger=capnslog' flag is set; use '--logger=zap' flag instead 2020-03-11 12:55:38.292027 I | embed: name = default 2020-03-11 12:55:38.292038 I | embed: data dir = default.etcd 2020-03-11 12:55:38.292045 I | embed: member dir = default.etcd/member 2020-03-11 12:55:38.292050 I | embed: heartbeat = 100ms 2020-03-11 12:55:38.292054 I | embed: election = 1000ms 2020-03-11 12:55:38.292058 I | embed: snapshot count = 100000 2020-03-11 12:55:38.292079 I | embed: advertise client URLs = http://localhost:2379 2020-03-11 12:55:38.407474 I | etcdserver: starting member 8e9e05c52164694d in cluster cdf818194e3a8c32
好了,看到上面的输出,服务端就启动好了,如果要测试,可以用etcdctl测试下。
etcdctl put "task/task1" "task任务" OK etcdctl get "task/task1" task/task1 task任务
代码操作
这里用Go对etcd进行一些简单的操作,go的客户端可以在官网上去找[https://etcd.io/docs/v3.4.0/integrations/] https://etcd.io/docs/v3.4.0/integrations/
我这里用的是clientv3这个包, 这个包在上面源码安装时,下载的包里面就有,那个包是服务端和客户端代码都有的。
Get操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.Config client *clientv3.Client kv clientv3.KV resp *clientv3.GetResponse err error ) config = clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout:5 * time.Second, } if client,err = clientv3.New(config); err != nil { fmt.Println(err) return } kv = clientv3.NewKV(client) if resp, err = kv.Get(context.TODO(),"cron/jobs/",clientv3.WithPrefix()); err != nil { fmt.Println(err) return } for k,v := range resp.Kvs { fmt.Println(k,v,string(v.Key), string(v.Value)) } }
续约操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.Config client *clientv3.Client Lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID leaseKeepResp *clientv3.LeaseKeepAliveResponse leaseKeepRespChan <-chan *clientv3.LeaseKeepAliveResponse getResp *clientv3.GetResponse putResp *clientv3.PutResponse kv clientv3.KV err error ) config = clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout:5 * time.Second, } if client,err = clientv3.New(config); err != nil { fmt.Println(err) return } Lease = clientv3.NewLease(client) if leaseGrantResp, err = Lease.Grant(context.TODO(), 10); err != nil { fmt.Println(err) } leaseId = leaseGrantResp.ID ctx, _ := context.WithTimeout(context.TODO(), 10 * time.Second) //10秒停止续约 if leaseKeepRespChan, err = Lease.KeepAlive(ctx,leaseId); err != nil { fmt.Println(err) } //启动协程,进行续约 go func(){ for { select { case leaseKeepResp = <-leaseKeepRespChan: if leaseKeepResp == nil{ fmt.Println("租约失效了") goto END } else { fmt.Println("收到续约", leaseKeepResp) } } } END: }() kv = clientv3.NewKV(client) if putResp ,err = kv.Put(context.TODO(),"cron/jobs/job3","job3",clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) } else { fmt.Println("写入成功", putResp.Header.Revision) } for { if getResp, err = kv.Get(context.TODO(),"cron/jobs/job3"); err != nil { fmt.Println(err) } if getResp.Count != 0 { fmt.Println(getResp.Kvs) } else { fmt.Println(" 过期了") break } time.Sleep(2 * time.Second) } }
Op操作
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) func main(){ var ( config clientv3.Config client *clientv3.Client opPut clientv3.Op opGet clientv3.Op opResp clientv3.OpResponse kv clientv3.KV err error ) config = clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout:5 * time.Second, } if client,err = clientv3.New(config); err != nil { fmt.Println(err) return } kv = clientv3.NewKV(client) opPut = clientv3.OpPut("cron/jobs/job3","job3333") if opResp, err = kv.Do(context.TODO(),opPut); err != nil { fmt.Println(err) return } fmt.Println("获取revision", opResp.Put().Header.Revision) opGet = clientv3.OpGet("cron/jobs/job3") if opResp ,err = kv.Do(context.TODO(),opGet); err != nil { fmt.Println(err) return } if len(opResp.Get().Kvs) != 0 { fmt.Println("获取值:", string(opResp.Get().Kvs[0].Value)) } }
简单分布式锁
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "time" ) //简单分布式锁 func main(){ var ( config clientv3.Config client *clientv3.Client /*opPut clientv3.Op opGet clientv3.Op opResp clientv3.OpResponse*/ kv clientv3.KV txn clientv3.Txn txnResp *clientv3.TxnResponse Lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID leaseKeepResp *clientv3.LeaseKeepAliveResponse leaseKeepRespChan <-chan *clientv3.LeaseKeepAliveResponse ctx context.Context cancelFunc context.CancelFunc err error ) config = clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout:5 * time.Second, } if client,err = clientv3.New(config); err != nil { fmt.Println(err) return } //租约操作 Lease = clientv3.NewLease(client) if leaseGrantResp, err = Lease.Grant(context.TODO(), 5); err != nil { fmt.Println(err) } leaseId = leaseGrantResp.ID ctx, cancelFunc = context.WithCancel(context.TODO()) defer cancelFunc() defer Lease.Revoke(context.TODO(),leaseId) if leaseKeepRespChan, err = Lease.KeepAlive(ctx,leaseId); err != nil { fmt.Println(err) } //启动协程,进行续约 go func(){ for { select { case leaseKeepResp = <-leaseKeepRespChan: if leaseKeepResp == nil{ fmt.Println("租约失效了") goto END } else { fmt.Println("收到续约", leaseKeepResp) } } } END: }() kv = clientv3.NewKV(client) //开启事务 txn = kv.Txn(context.TODO()) txn.If(clientv3.Compare(clientv3.CreateRevision("cron/lock/job3"),"=", 0)). Then(clientv3.OpPut("cron/lock/job3","123",clientv3.WithLease(leaseId))). Else(clientv3.OpGet("cron/lock/job3")) if txnResp,err = txn.Commit(); err != nil { fmt.Println(err) return } if !txnResp.Succeeded { fmt.Println("OOH,锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } else { fmt.Println("抢到了") } fmt.Println("处理业务") time.Sleep( 10 * time.Second) }
以上就是记录我在学习etcd的基本入门和一些常用的操作,希望大家能喜欢,期待和大家的一些进步。
Copyright © 2004-2024 Ynicp.com 版权所有 法律顾问:建纬(昆明)律师事务所 昆明市网翼通科技有限公司 滇ICP备08002592号-4