一 背景
在读源码的时候,读到了一个 go 项目与 etcd 交互的封装。这里就行记录留存下,同时学习 etcd 的一致性原则。
二 ETCD交互封装代码
package model import ( "context" "time" "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) type EtcdModel interface { Add(ctx context.Context, key string, message proto.Message) (int64, error) AddWithLease(ctx context.Context, key string, m proto.Message, leaseID clientv3.LeaseID) (int64, error) Update(ctx context.Context, key string, message proto.Message, preRevision int64) (int64, error) UpdateIgnoreRevision(ctx context.Context, key string, m proto.Message) (int64, error) Get(ctx context.Context, key string, message proto.Message) (int64, error) Put(ctx context.Context, key string, m proto.Message) (int64, error) PutWithLease(ctx context.Context, key string, m proto.Message, leaseID clientv3.LeaseID) (int64, error) DeleteIgnoreRevision(ctx context.Context, key string, prev proto.Message) (int64, error) GrantLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) KeepAliveLease(ctx context.Context, id clientv3.LeaseID) error RevokeLease(ctx context.Context, id clientv3.LeaseID) error Transaction(ctx context.Context, op *TransactionOperator) (int64, error) } type EtcdModelImpl struct { logger *zap.Logger client *clientv3.Client queryTimeout time.Duration watcher *EtcdWatcher } func NewEtcdModel(logger *zap.Logger, client *clientv3.Client, queryTimeout time.Duration, watcher *EtcdWatcher) *EtcdModelImpl { return &EtcdModelImpl{ logger: logger, client: client, queryTimeout: queryTimeout, watcher: watcher, } } func (em *EtcdModelImpl) Add(ctx context.Context, key string, message proto.Message) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value)) cond := clientv3.Compare(clientv3.Version(key), "=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil } func (em *EtcdModelImpl) AddWithLease(ctx context.Context, key string, message proto.Message, leaseID clientv3.LeaseID) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithLease(leaseID)) cond := clientv3.Compare(clientv3.Version(key), "=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil } func (em *EtcdModelImpl) Update(ctx context.Context, key string, message proto.Message, preRevision int64) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithPrevKV()) cond := clientv3.Compare(clientv3.ModRevision(key), "=", preRevision) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.RevisionConflictError{Given: preRevision} } return resp.Header.Revision, nil } func (em *EtcdModelImpl) UpdateIgnoreRevision(ctx context.Context, key string, message proto.Message) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithPrevKV()) cond := clientv3.Compare(clientv3.ModRevision(key), "!=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil } func (em *EtcdModelImpl) DeleteIgnoreRevision(ctx context.Context, key string, prev proto.Message) (int64, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("delete", func() (*clientv3.DeleteResponse, error) { return em.client.Delete(c, key, clientv3.WithPrevKV()) }) if err != nil { return 0, err } if resp.Deleted <= 0 { return 0, &util.EtcdKeyNotFoundError{Key: key} } if prev != nil { if err := proto.Unmarshal(resp.PrevKvs[0].Value, prev); err != nil { return 0, err } } return resp.Header.Revision, nil } func (em *EtcdModelImpl) Get(ctx context.Context, key string, message proto.Message) (int64, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("get", func() (*clientv3.GetResponse, error) { return em.client.Get(c, key) }) if err != nil { return 0, err } if len(resp.Kvs) == 0 { return 0, &util.EtcdKeyNotFoundError{Key: key} } if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil { return 0, err } return resp.Kvs[0].ModRevision, nil } func (em *EtcdModelImpl) Put(ctx context.Context, key string, m proto.Message) (int64, error) { value, err := proto.Marshal(m) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("put", func() (*clientv3.PutResponse, error) { return em.client.Put(c, key, string(value)) }) if err != nil { return 0, err } return resp.Header.Revision, nil } func (em *EtcdModelImpl) PutWithLease(ctx context.Context, key string, m proto.Message, leaseID clientv3.LeaseID) (int64, error) { value, err := proto.Marshal(m) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("put", func() (*clientv3.PutResponse, error) { return em.client.Put(c, key, string(value), clientv3.WithLease(leaseID)) }) if err != nil { return 0, err } return resp.Header.Revision, nil } func (em *EtcdModelImpl) GrantLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("grant", func() (*clientv3.LeaseGrantResponse, error) { return em.client.Grant(c, ttl) }) if err != nil { return 0, err } return resp.ID, nil } func (em *EtcdModelImpl) KeepAliveLease(ctx context.Context, id clientv3.LeaseID) error { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() _, err := withLatency("keep_alive", func() (*clientv3.LeaseKeepAliveResponse, error) { return em.client.KeepAliveOnce(c, id) }) return err } func (em *EtcdModelImpl) RevokeLease(ctx context.Context, id clientv3.LeaseID) error { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() _, err := withLatency("revoke", func() (*clientv3.LeaseRevokeResponse, error) { return em.client.Revoke(c, id) }) return err } func (em *EtcdModelImpl) Transaction(ctx context.Context, op *TransactionOperator) (int64, error) { var conditions []clientv3.Cmp var ops []clientv3.Op updatedMessages := make(map[string][]byte) for _, m := range op.Adds { conditions = append(conditions, clientv3.Compare(clientv3.Version(m.Key), "=", 0)) value, err := proto.Marshal(m.Message) if err != nil { return 0, err } updatedMessages[m.Key] = value ops = append(ops, clientv3.OpPut(m.Key, string(value))) } for _, m := range op.Updates { if m.Revision > 0 { conditions = append(conditions, clientv3.Compare(clientv3.ModRevision(m.Key), "=", m.Revision)) } value, err := proto.Marshal(m.Message) if err != nil { return 0, err } updatedMessages[m.Key] = value ops = append(ops, clientv3.OpPut(m.Key, string(value))) } for _, m := range op.Deletes { if m.Revision > 0 { conditions = append(conditions, clientv3.Compare(clientv3.ModRevision(m.Key), "=", m.Revision)) } ops = append(ops, clientv3.OpDelete(m.Key, clientv3.WithPrevKV())) } if len(ops) == 0 { return 0, nil } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("transaction", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(conditions...).Then(ops...).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.RevisionConflictError{} } // This is a fast path to dispatch etcd events to watchers. // It's necessary as the scheduler relies on the integrity of the watching indexes. var events []EtcdEvent for _, r := range resp.Responses { if rr := r.GetResponseDeleteRange(); rr != nil { for _, kv := range rr.PrevKvs { events = append(events, EtcdEvent{ EventType: mvccpb.DELETE, Key: string(kv.Key), Value: kv.Value, Revision: resp.Header.Revision, }) } } } for key, value := range updatedMessages { events = append(events, EtcdEvent{ EventType: mvccpb.PUT, Key: key, Value: value, Revision: resp.Header.Revision, }) } em.watcher.dispatchEvents(events) return resp.Header.Revision, nil } func withLatency[R any](method string, f func() (R, error)) (R, error) { start := time.Now() resp, err := f() latency := time.Since(start) if err != nil { prom.EtcdReqSeconds.With(prometheus.Labels{"method": method, "status": "failed"}).Observe(latency.Seconds()) return resp, err } prom.EtcdReqSeconds.With(prometheus.Labels{"method": method, "status": "ok"}).Observe(latency.Seconds()) return resp, err }
三 方法详解
1. Add - 添加
func (em *EtcdModelImpl) Add(ctx context.Context, key string, message proto.Message) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value)) cond := clientv3.Compare(clientv3.Version(key), "=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil }
req := clientv3.OpPut(key, string(value))- 含义:创建一个 put 操作对象
req,表示“将 key 的值设置为 value”。
- 详细解释:
key是你要写入的键。string(value)是要写入的值(这里假设 value 是 []byte 类型,所以需要转成字符串)。
cond := clientv3.Compare(clientv3.Version(key), "=", 0)- 含义:创建一个比较条件,表示“只有当 key 的版本号为0时条件成立”。
- 详细解释:
clientv3.Compare用来构造一个比较条件,常用在 etcd 的事务(Txn)操作中。clientv3.Version(key)获取该 key 的版本号。"="表示相等比较。0表示“key 不存在”。在 etcd 中,key 不存在时版本号为0。
- 所以:这个条件的意思是“只有当 key 不存在时,条件成立”。
resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) {
return em.client.Txn(c).If(cond).Then(req).Commit()
})- 含义:执行一个 etcd 事务操作,“如果 key 不存在,则写入”,并记录操作延迟。
- 详细解释:
withLatency("put", ...)这里是包装函数,通常用来统计操作耗时。em.client.Txn(c)创建一个事务对象,c是上下文(context)。.If(cond)设定事务的判断条件,即上面的“key 不存在”。.Then(req)如果条件成立,则执行“put”操作。.Commit()提交事务。如果条件不成立,则什么都不做。resp是事务响应,err是可能的错误。
实现“分布式锁”或“唯一写入”,防止 key 被重复写入。
2. AddWithLease - 带有效时间的添加
func (em *EtcdModelImpl) AddWithLease(ctx context.Context, key string, message proto.Message, leaseID clientv3.LeaseID) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithLease(leaseID)) cond := clientv3.Compare(clientv3.Version(key), "=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil }
- 与 Add 类似,但数据会在租约到期后自动删除
- 适用于临时数据或需要自动清理的场景
req := clientv3.OpPut(key, string(value), clientv3.WithLease(leaseID))
- 作用:创建一个 put 操作:将
key的值设置为value,同时让该 key 绑定到指定的租约(leaseID)上。
- 详细解释:
clientv3.WithLease(leaseID):绑定租约。该 key 会和 leaseID 绑定,如果该租约过期(未续约),key 会自动被 etcd 删除。
- 用途:常用来实现自动过期的 key,比如分布式锁、服务注册等场景。
3. Update - 版本控制更新
func (em *EtcdModelImpl) Update(ctx context.Context, key string, message proto.Message, preRevision int64) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithPrevKV()) cond := clientv3.Compare(clientv3.ModRevision(key), "=", preRevision) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.RevisionConflictError{Given: preRevision} } return resp.Header.Revision, nil }
功能:基于指定版本号更新数据
- 使用 clientv3.Compare(clientv3.ModRevision(key), "=", preRevision) 确保版本一致性
- 防止并发更新冲突
- 如果版本不匹配,返回 RevisionConflictError
这段代码实现了 etcd 的乐观锁写入:只有 key 没被其他人修改过(modRevision 没变)时才写入新值,并把旧值返回。常用于高并发下的并发安全更新。
req := clientv3.OpPut(key, string(value), clientv3.WithPrevKV())
- 作用:构造一个 Put 操作请求。
- 细节说明:
clientv3.OpPut(key, string(value), ...):构造一个“写入 key 的值为 value”的操作。clientv3.WithPrevKV():表示返回该 key 被覆盖前的旧值(previous kv)。这样事务成功后,你可以从响应中拿到被覆盖前的内容。
- 用途:比如你想要知道本次 put 操作之前 key 的值,做日志或回滚等处理。
4. UpdateIgnoreRevision 方法 - 忽略版本更新
func (em *EtcdModelImpl) UpdateIgnoreRevision(ctx context.Context, key string, message proto.Message) (int64, error) { value, err := proto.Marshal(message) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() req := clientv3.OpPut(key, string(value), clientv3.WithPrevKV()) cond := clientv3.Compare(clientv3.ModRevision(key), "!=", 0) resp, err := withLatency("put", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(cond).Then(req).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.EtcdKeyExistsError{Key: key} } return resp.Header.Revision, nil }
功能:强制更新数据,不检查版本
- 条件:clientv3.Compare(clientv3.ModRevision(key), "!=", 0)(key 必须存在)
- 适用于不需要版本控制的场景
5. DeleteIgnoreRevision 方法 - 删除数据
func (em *EtcdModelImpl) DeleteIgnoreRevision(ctx context.Context, key string, prev proto.Message) (int64, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("delete", func() (*clientv3.DeleteResponse, error) { return em.client.Delete(c, key, clientv3.WithPrevKV()) }) if err != nil { return 0, err } if resp.Deleted <= 0 { return 0, &util.EtcdKeyNotFoundError{Key: key} } if prev != nil { if err := proto.Unmarshal(resp.PrevKvs[0].Value, prev); err != nil { return 0, err } } return resp.Header.Revision, nil }
resp, err := withLatency("delete", func() (*clientv3.DeleteResponse, error) { return em.client.Delete(c, key, clientv3.WithPrevKV()) })
em.client.Delete(...):删除 etcd 里的指定 key。
clientv3.WithPrevKV():要求 etcd 返回被删除的 key 的旧值(prev kv)。
withLatency("delete", ...):统计 delete 操作的耗时(常用于监控)。
- 返回值
resp是 DeleteResponse,err是错误信息。
if prev != nil { if err := proto.Unmarshal(resp.PrevKvs[0].Value, prev); err != nil { return 0, err } }
- 如果调用方传递了
prev(一般是一个 protobuf 结构体的指针),将被删除 key 的旧值(resp.PrevKvs[0].Value)反序列化到 prev 里。
- 如果反序列化失败,返回错误。
- 这样调用方就能拿到删除前的对象内容。
6. Get 方法 - 获取数据
func (em *EtcdModelImpl) Get(ctx context.Context, key string, message proto.Message) (int64, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("get", func() (*clientv3.GetResponse, error) { return em.client.Get(c, key) }) if err != nil { return 0, err } if len(resp.Kvs) == 0 { return 0, &util.EtcdKeyNotFoundError{Key: key} } if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil { return 0, err } return resp.Kvs[0].ModRevision, nil }
功能
:获取指定 key 的数据
- 将数据反序列化到传入的 message 中
- 返回数据的 ModRevision
- 如果 key 不存在,返回 EtcdKeyNotFoundError
7. Put 方法 - 无条件写入
func (em *EtcdModelImpl) Put(ctx context.Context, key string, m proto.Message) (int64, error) { value, err := proto.Marshal(m) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("put", func() (*clientv3.PutResponse, error) { return em.client.Put(c, key, string(value)) }) if err != nil { return 0, err } return resp.Header.Revision, nil }
功能
:无条件写入或覆盖数据
- 不检查 key 是否存在
- 适用于强制写入场景
8. PutWithLease 方法 - 带租约的写入
func (em *EtcdModelImpl) PutWithLease(ctx context.Context, key string, m proto.Message, leaseID clientv3.LeaseID) (int64, error) { value, err := proto.Marshal(m) if err != nil { return 0, err } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("put", func() (*clientv3.PutResponse, error) { return em.client.Put(c, key, string(value), clientv3.WithLease(leaseID)) }) if err != nil { return 0, err } return resp.Header.Revision, nil }
功能
:写入数据并绑定租约
- 与 Put 类似,但数据会在租约到期后自动删除
9. 租约管理方法
GrantLease - 创建租约
func (em *EtcdModelImpl) GrantLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("grant", func() (*clientv3.LeaseGrantResponse, error) { return em.client.Grant(c, ttl) }) if err != nil { return 0, err } return resp.ID, nil }
创建一个新的租约,指定 TTL(生存时间)
作用说明:
- 在 etcd 里申请一个租约(lease),ttl(生存时间,秒级)。
- 用于实现临时 key、锁、服务注册等需要自动过期的场景。
- 返回租约的 ID,调用者可以将 key 绑定到这个租约上。
KeepAliveLease - 续约
func (em *EtcdModelImpl) KeepAliveLease(ctx context.Context, id clientv3.LeaseID) error { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() _, err := withLatency("keep_alive", func() (*clientv3.LeaseKeepAliveResponse, error) { return em.client.KeepAliveOnce(c, id) }) return err }
作用说明:
- 对指定 leaseID 做一次续约,防止 lease 过期。
- 续约长度为创建时候的 ttl 长度。
- 适合手动定期续约的场景(比如定时任务、心跳)。
RevokeLease - 撤销租约
func (em *EtcdModelImpl) RevokeLease(ctx context.Context, id clientv3.LeaseID) error { c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() _, err := withLatency("revoke", func() (*clientv3.LeaseRevokeResponse, error) { return em.client.Revoke(c, id) }) return err }
作用说明:
- 主动撤销租约。
- 所有绑定在这个租约上的 key 会立刻失效并被 etcd 删除。watch 相关的 key 的客户端会收到 delete 事件
- 典型应用:服务下线、锁释放、临时资源销毁。
10. Transaction 方法 - 事务操作
func (em *EtcdModelImpl) Transaction(ctx context.Context, op *TransactionOperator) (int64, error) { var conditions []clientv3.Cmp var ops []clientv3.Op updatedMessages := make(map[string][]byte) for _, m := range op.Adds { conditions = append(conditions, clientv3.Compare(clientv3.Version(m.Key), "=", 0)) value, err := proto.Marshal(m.Message) if err != nil { return 0, err } updatedMessages[m.Key] = value ops = append(ops, clientv3.OpPut(m.Key, string(value))) } for _, m := range op.Updates { if m.Revision > 0 { conditions = append(conditions, clientv3.Compare(clientv3.ModRevision(m.Key), "=", m.Revision)) } value, err := proto.Marshal(m.Message) if err != nil { return 0, err } updatedMessages[m.Key] = value ops = append(ops, clientv3.OpPut(m.Key, string(value))) } for _, m := range op.Deletes { if m.Revision > 0 { conditions = append(conditions, clientv3.Compare(clientv3.ModRevision(m.Key), "=", m.Revision)) } ops = append(ops, clientv3.OpDelete(m.Key, clientv3.WithPrevKV())) } if len(ops) == 0 { return 0, nil } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("transaction", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(conditions...).Then(ops...).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.RevisionConflictError{} } // This is a fast path to dispatch etcd events to watchers. // It's necessary as the scheduler relies on the integrity of the watching indexes. var events []EtcdEvent for _, r := range resp.Responses { if rr := r.GetResponseDeleteRange(); rr != nil { for _, kv := range rr.PrevKvs { events = append(events, EtcdEvent{ EventType: mvccpb.DELETE, Key: string(kv.Key), Value: kv.Value, Revision: resp.Header.Revision, }) } } } for key, value := range updatedMessages { events = append(events, EtcdEvent{ EventType: mvccpb.PUT, Key: key, Value: value, Revision: resp.Header.Revision, }) } em.watcher.dispatchEvents(events) return resp.Header.Revision, nil }
功能:执行原子事务操作
- 支持批量添加、更新、删除
- 所有操作要么全部成功,要么全部失败
- 唯一会手动触发事件分发的方法
- 通过 em.watcher.dispatchEvents(events) 立即通知 watchers
a. 构造事物条件与操作
Add(新增)
- 条件:只有当 key 不存在时(Version==0)才执行新增,防止已存在时被误覆盖。
- 操作:序列化 message,准备 put 操作。
- 结果:如果 key 已经存在,则事务整体失败(原子性保证)。
Update(更新)
- 条件:如指定了 Revision,则要求 key 的 ModRevision 和期望一致(乐观并发控制)。
- 操作:put 新值。
- 结果:若 revision 不符,说明有并发冲突,事务整体失败。
Delete(删除)
- 条件:如指定 Revision,则要求 key 的 ModRevision 和期望一致。
- 操作:删除并返回前值(WithPrevKV)。
- 结果:同样有冲突时整体失败。
b. 事务提交
if len(ops) == 0 { return 0, nil // 没有操作就直接返回 } c, cancel := context.WithTimeout(ctx, em.queryTimeout) defer cancel() resp, err := withLatency("transaction", func() (*clientv3.TxnResponse, error) { return em.client.Txn(c).If(conditions...).Then(ops...).Commit() }) if err != nil { return 0, err } if !resp.Succeeded { return 0, &util.RevisionConflictError{} }
- 用 etcd 事务 API 原子提交。
- 如果条件不成立,返回自定义的 RevisionConflictError。
c. 事件分发
var events []EtcdEvent for _, r := range resp.Responses { if rr := r.GetResponseDeleteRange(); rr != nil { for _, kv := range rr.PrevKvs { events = append(events, EtcdEvent{ EventType: mvccpb.DELETE, Key: string(kv.Key), Value: kv.Value, Revision: resp.Header.Revision, }) } } } for key, value := range updatedMessages { events = append(events, EtcdEvent{ EventType: mvccpb.PUT, Key: key, Value: value, Revision: resp.Header.Revision, }) } em.watcher.dispatchEvents(events)
- 对于删除操作,从 txn 响应中取出被删除 key 的前值,构造 DELETE 事件。
- 对于 add 和 update 操作,构造 PUT 事件。
d. 返回 revision
return resp.Header.Revision, nil
返回本次事务提交后的全局 revision,方便调用者做后续一致性操作。
