一 etcd中的事务

二 示例

    // 第一步:加锁(创建租约,确保租约不过期,使用租约抢占key)

    // 申请一个5秒租约
    lease := clientv3.NewLease(cli)
    leaseR, err := lease.Grant(context.TODO(), 5)
    if err != nil {
        fmt.Println("lease err:", err)
        return
    }

    // 第三步中的释放锁 准备一个用于取消自动续租的context
    ctx, cancelFunc := context.WithCancel(context.TODO())
    defer cancelFunc()
    defer lease.Revoke(context.TODO(), leaseR.ID)        // 释放租约

    // 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
    keepChan, err := lease.KeepAlive(ctx, leaseR.ID )
    if err != nil {
        fmt.Println("keep err:", err)
        return
    }
    // 启动一个协程去消费chan的应答
    go func(){
        for {
            select {
            case keepR := <- keepChan:
                if keepChan == nil {        // 此时系统异常或者主动取消context
                    fmt.Println("租约失效")
                    goto END
                } else {        // 每秒续租一次
                    fmt.Println("收到自动续租应答:", keepR.ID)
                }
            }
        }
    END:
    }()

    // 使用事务判断key是否存在;判断其
    key := "/cron/lock/jobX"
    kv := clientv3.NewKV(cli)
    txn := kv.Txn(context.TODO())        // 分布式事务
    txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
        Then(clientv3.OpPut(key, "xxx", clientv3.WithLease(leaseR.ID))).        // 一般这里val记录是哪个ID抢到
        Else(clientv3.OpGet(key))        // 否则抢锁失败

    // 提交事务
    txnR, err := txn.Commit()
    if err != nil {
        fmt.Println("txn失败:", err)
        return
    }
    // 判断是否抢到了锁
    if !txnR.Succeeded {
        fmt.Println("没抢到锁,锁已被占用;", string(txnR.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    // 第二步:业务代码书写
    fmt.Println("模拟处理任务")
    time.Sleep(time.Second * 5)

    // 第三步:释放锁(取消续租,释放租约)

多次执行上述方法,观察结果

results matching ""

    No results matching ""