版本控制的一个例子
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
{
"_index": "test",
"_type": "test",
"_id": "1",
"_version": 1,
"created": true
}
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
{
"_index": "test",
"_type": "test",
"_id": "1",
"_version": 2,
"created": true
}
curl -XPOST http://localhost:9200/test/test/1?version=1 -d '{"msg": "test"}'
{
"error": "RemoteTransportException[[elasticsearch_226][inet[/172.16.18.226:9300]][indices:data/write/index]]; nested: VersionConflictEngineException[[test][0] [test][1]: version conflict, current [4], provided [1]]; ",
"status": 409
}
提示:版本冲突
使用外部系统提供的版本号
Elasticsearch 的乐观锁,可以使用外部系统提供的版本号:
curl -XPOST http://localhost:9200/test/test/1?version=10&version_type=external -d '{"msg": "test"}'
{
"_index": "test",
"_type": "test",
"_id": "1",
"_version": 10,
"created": true
}
curl -XPOST http://localhost:9200/test/test/1?version=9&version_type=external -d '{"msg": "test"}'
{
"error": "VersionConflictEngineException[[test][0] [test][1]: version conflict, current [10], provided [9]]",
"status": 409
}
这时Elasticsearch将只检查提供的版本是否比当前保存在索引中的版本大(大多少不重要),如果是成功,否则失败。
Elasticsearch内部版本号
在Elasticsearch中,更新请求实际上是分为两个阶段,获取文档,修改文档,然后保存文档。
那么当两个更新请求同时要修改文档的时候,系统乐观的认为不会有两个并发请求对一个系统操作。
文档原本的版本为1,请求A获取了version为1的文档,请求B也获取了version为1的文档,然后请求A修改完文档后,并且先执行了保存操作,这个时候,系统中的文档version变为了2。
这个时候,B再执行保存操作的时候,告诉系统我要修改version为1的文档。系统就会抛出一个错误,说文档版本不匹配。然后这个错误由应用程序自己来进行控制。
这种机制在请求量大的时候会比悲观锁机制好。但是缺点是需要程序处理版本冲突错误,可能一般的方法是封装更新操作,并且设置重复重试次数。
注意这里的更新指的是:
curl -XPOST http://localhost:9200/test/test/1/_update -d '{"msg": "test"}'
而不是:
curl -XPOST http://localhost:9200/test/test/1 -d '{"msg": "test"}'
这个是create,这种在ES内部没有乐观锁的概念,是直接覆盖,这种想解决数据被覆盖,就要显示的带着version
下面的程序,并发6个进程同时批量更新一个文档,在结果能看到有的请求出错了,返回版本冲突。
<?php
require_once dirname(__FILE__).'/../shjf/vendor/autoload.php';
$client = new Elasticsearch\Client(['hosts' => ['172.16.18.226:9200']]);
for($i=0;$i<5;$i++){
if (pcntl_fork() == 0) break;
}
$params=[
'index' => 'test',
'type' => 'test',
'body' => [],
];
for($i=0;$i<1;$i++){
$params['body'][] = ['update' => ['_id' => 111]];
$params['body'][] = ["doc" => ["id" => $i]];
}
try
{
$result = $client->bulk($params);
var_dump($result) . "\n";
}
catch(Exception $e)
{
echo $e->getMessage() . "\n";
$info = $client->transport->getLastConnection()->getLastRequestInfo();
print_r($info);
exit;
}
echo "end\n";
?>
{
"took":1,
"errors":true,
"items":[
{"update": {
"_index":"test",
"_type":"test",
"_id":"111",
"status":409,
"error":"VersionConflictEngineException[[test][0] [test][111]: version conflict, current [3], provided [2]]"}
}
]
}
参考资料:
- Elasticsearch官网:
- 《Elasticsearch权威指南》处理冲突