基于DTM分布式事务管理的PHP客户端

PHP技术
422
0
0
2022-08-05
DTM是一套跨语言的分布式事务解决方案,支持4种事务模型(Xa、Tcc、事务消息、Saga),针对分布式事务场景中可能出现的空回滚、悬挂问题,独创了子事务屏障,在大厂均有应用,但已有的php客户端只支持tcc,所以重新开发了这个组件。
一、启动本地dtm
DTM文档教程
git clone https://github.com/yedf/dtm && cd dtm
go mod download
go run app/main.go dtmsvr
二、安装客户端

composer require sett/dtmcli-php

三、示例代码
  • tcc
  • 特性:一个操作对应2段提交,预处理(try),确认(confirm)、回滚(cancel),确认与回滚不允许失败。

    // 127.0.0.1:36789为dtm默认端口
    $trans   = new TccTrans("127.0.0.1:36789");
    // 获取新事务ID
    $gid     = $trans->createNewGid();
    // 事务操作 
    $success = $trans->withOperate($gid, function (TccTrans $tccTrans) use ($baseUrl) {
        $result = $tccTrans->callBranch(
            ["amount" => 30],
            "$baseUrl/dtm/tcc/transOut",
            "$baseUrl/dtm/tcc/transOutConfirm",
            "$baseUrl/dtm/tcc/transOutCancel"
        );
        if (!$result) {
            echo "call branch fail\n";
            return false;
        }
        return $tccTrans->callBranch(
            ["amount" => 30],
            "$baseUrl/dtm/tcc/transIn",
            "$baseUrl/dtm/tcc/transInConfirm",
            "$baseUrl/dtm/tcc/transInCancel"
        );
    });
  • saga
  • 特性:一个操作对应1个回滚,任何一个失败,整个事务视为失败,回滚。

    $trans = new SagaTrans("127.0.0.1:36789");
    $gid   = $trans->createNewGid();
    $trans
        ->withGid($gid)
        ->withOperate("$baseUrl/dtm/saga/transOut", "$baseUrl/dtm/saga/transOutRevert", ["amount" => 30])
        ->withOperate("$baseUrl/dtm/saga/transIn", "$baseUrl/dtm/saga/transInRevert", ["amount" => 30]);
    $success = $trans->submit();
  • 事务消息
  • 特性:操作对应一个查询。

    $trans = new MsgTrans("127.0.0.1:36789");
    $gid   = $trans->createNewGid();
    $trans
        ->withOperate("$baseUrl/dtm/msg/transOut", ["amount" => 30])
        ->withOperate("$baseUrl/dtm/msg/transIn", ["amount" => 30])
        ->withQueryUrl("$baseUrl/dtm/msg/query")
        ->prepare();
    $success = $trans->submit();
  • 子事务屏障
  • 特性:防止空回滚,处理悬挂,幂等等问题。

    class UserDatabase implements IDatabase
     {

      public function execute(string $query) {
          // TODO: Implement execute() method.
      }

      public function query(string $query): bool {
          // TODO: Implement query() method.
      }

      public function rollback() {
          // TODO: Implement rollback() method.
      }

      public function commit() {
          // TODO: Implement commit() method.
      }
     }

    $baseUrl = "http://127.0.0.1:18310";
    try {
        $trans    = new BarrierTrans([
        "trans_type" => "tcc",
        "gid"        => "ac130059_4pQHea5Xtsq",
        "op"         => "prepare",
        "branch_id"  => "01"
        ]);
        $database = new UserDatabase();
        $success  = $trans->call($database, function (IDatabase $database) {
           // 使用当前数据库连接操作,保证所有操作都在一个本地事务中 
           // do what you want... 
           return true;
        });
        echo "transaction result {$success}";
    } catch (Exception $exception) {
        var_dump($exception->getTraceAsString());
        echo "exception with error " . $exception->getMessage();
    }
  • Xa
  • 特性:基于数据库本地事务,保证操作在同一个事务中,使用的场景比较少,不写了