环境:Springboot2.3.12RELEASE + Elasticsearch7.8.0
由于任务是手动执行,希望每次任务的执行都能将生成的数据添加到不同的索引中。
环境配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version><!--$NO-MVN-MAN-VER$-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>
spring.elasticsearch.index.task=t_task_
spring.elasticsearch.rest.uris=http://localhost:9200
创建索引模版
http://localhost:9200/_template/t_task_tml
{
"index_patterns": [
"t_task_*"
],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0,
"index" : {
"refresh_interval" : "60s",
"translog": {
"sync_interval": "20s"
}
}
},
"mappings": {
"_source": {
"enabled": true
},
"properties": {
"taskId": {
"type": "keyword"
},
"taskName": {
"type": "keyword"
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
通过创建索引模版,每次在根据不同的任务创建不同的索引,而不需要重复设置mapping,setting信息。
批量操作
public abstract class BulkOperator<T> {
@Resource
protected RestHighLevelClient client ;
public abstract void bulkWriter(List<T> datas, String suffix) ;
}
@Service
public class TaskBulkService extends BulkOperator<Task> {
private static final Logger logger = LoggerFactory.getLogger(TaskBulkService.class) ;
@Value(value = "${spring.elasticsearch.index.task:t_task_}")
private String index_name ;
@Override
public void bulkWriter(List<Task> datas, String suffix) {
BulkRequest bulkRequest = new BulkRequest() ;
bulkRequest.timeout(TimeValue.timeValueMinutes(1)) ;
ObjectMapper objectMapper = new ObjectMapper() ;
datas.forEach(task -> {
try {
bulkRequest.add(new IndexRequest(index_name + suffix).id(task.getId()).source(objectMapper.writeValueAsString(task), XContentType.JSON)) ;
} catch (JsonProcessingException e) {
logger.error("对象Task转String错误:{}", e) ;
}
});
try {
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT) ;
if (response.status() != RestStatus.OK) {
logger.error("批量保存失败") ;
}
} catch (IOException e) {
logger.error("批量保存错误:{}", e) ;
}
}
}
测试
@GetMapping("/es/bulk")
public Object bulk(String suffix) {
List<Task> datas = new ArrayList<>() ;
for (int i = 0; i < 10; i++) {
Task task = new Task() ;
task.setTaskId("taskId " + i) ;
task.setTaskName("taskName " + i) ;
task.setCreateTime("createTime " + i) ;
datas.add(task) ;
}
bulkOperator.bulkWriter(datas, suffix);
return "success" ;
}
索引会自动创建。
完毕!!!
公众:Springboot实战案例锦集
Spring Cloud 微服务日志收集管理Elastic Stack完整详细版
ElasticSearch RESTful接口详细说明(二)
ElasticSearch RESTful接口详细说明(一)
SpringBoot整合ElasticSearch详解及相关使用方法
Springboot2整合Elasticsearch7批量插入数据
本文暂时没有评论,来添加一个吧(●'◡'●)