Elasticsearch 并发修改乐观锁

edph2008 8年前

来自: http://blog.csdn.net//jiao_fuyou/article/details/50482117


版本控制的一个例子

curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'  {      "_index": "test",      "_type": "test",      "_id": "1",      "_version": 1,      "created": true  }  curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'  {      "_index": "test",      "_type": "test",      "_id": "1",      "_version": 2,      "created": true  }  curl -XPOST http://localhost:9200/test/test/1?version=1 -d '{"msg": "test"}'  {      "error": "RemoteTransportException[[elasticsearch_226][inet[/172.16.18.226:9300]][indices:data/write/index]]; nested: VersionConflictEngineException[[test][0] [test][1]: version conflict, current [4], provided [1]]; ",      "status": 409  }

提示:版本冲突

使用外部系统提供的版本号

Elasticsearch 的乐观锁,可以使用外部系统提供的版本号:

curl -XPOST http://localhost:9200/test/test/1?version=10&version_type=external -d '{"msg": "test"}'  {      "_index": "test",      "_type": "test",      "_id": "1",      "_version": 10,      "created": true  }    curl -XPOST http://localhost:9200/test/test/1?version=9&version_type=external -d '{"msg": "test"}'  {      "error": "VersionConflictEngineException[[test][0] [test][1]: version conflict, current [10], provided [9]]",      "status": 409  }

这时Elasticsearch将只检查提供的版本是否比当前保存在索引中的版本大(大多少不重要),如果是成功,否则失败。

Elasticsearch内部版本号

在Elasticsearch中,更新请求实际上是分为两个阶段,获取文档,修改文档,然后保存文档。
那么当两个更新请求同时要修改文档的时候,系统乐观的认为不会有两个并发请求对一个系统操作。
文档原本的版本为1,请求A获取了version为1的文档,请求B也获取了version为1的文档,然后请求A修改完文档后,并且先执行了保存操作,这个时候,系统中的文档version变为了2。
这个时候,B再执行保存操作的时候,告诉系统我要修改version为1的文档。系统就会抛出一个错误,说文档版本不匹配。然后这个错误由应用程序自己来进行控制。
这种机制在请求量大的时候会比悲观锁机制好。但是缺点是需要程序处理版本冲突错误,可能一般的方法是封装更新操作,并且设置重复重试次数。

注意这里的更新指的是:

curl -XPOST http://localhost:9200/test/test/1/_update -d '{"msg": "test"}'

而不是:

curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'

这个是create,这种在ES内部没有乐观锁的概念,是直接覆盖,这种想解决数据被覆盖,就要显示的带着version

下面的程序,并发6个进程同时批量更新一个文档,在结果能看到有的请求出错了,返回版本冲突。

<?php    require_once dirname(__FILE__).'/../shjf/vendor/autoload.php';    $client = new Elasticsearch\Client(['hosts' => ['172.16.18.226:9200']]);    for($i=0;$i<5;$i++){          if (pcntl_fork() == 0) break;  }    $params=[      'index' => 'test',      'type'  => 'test',      'body' => [],  ];      for($i=0;$i<1;$i++){      $params['body'][] = ['update' => ['_id' => 111]];      $params['body'][] = ["doc" => ["id" => $i]];   }  try   {       $result = $client->bulk($params);      var_dump($result) . "\n";  }   catch(Exception $e)   {       echo $e->getMessage() . "\n";      $info = $client->transport->getLastConnection()->getLastRequestInfo();      print_r($info);      exit;  }    echo "end\n";    ?>    {    "took":1,    "errors":true,    "items":[      {"update": {          "_index":"test",          "_type":"test",          "_id":"111",          "status":409,          "error":"VersionConflictEngineException[[test][0] [test][111]: version conflict, current [3], provided [2]]"}      }    ]  }

参考资料: