网站首页 > 博客文章 正文
由于项目环境限制,flink1.7.2版本比较来老,没有自带flink es7的conector只有6版本的,需要使用RestHighLevelClient集成并实现数据处理,遇到很多坑,这里记录一下解决过程
1、组件版本
组件 | 版本 |
jdk | 1.8 |
flink | 1.7.2 |
kafka | 1.0.0 |
elasticsearch | 7.7.0 |
2、pom依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.7.2</version>
<exclusions>
<!-- 需要排除掉log4j,避免与flink框架日志冲突导致日志无法再flink manager无法正常打印-->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.9</version>
</dependency>
3、开发es客户端链接工具类
public class ElasticsearchUtil {
public static final Logger log = LoggerFactory.getLogger(ElasticsearchUtil.class);
public static RestHighLevelClient initEsRestHighLevelClient(boolean debug,String esClusterIp,String esUsername,String esPwd) {
RestHighLevelClient esRestHighLevelClient = null;
try {
// 设置账号和密码
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esUsername, esPwd));
// 分割 esClusterIp 字符串,获取单个地址
String[] addresses = esClusterIp.split(",");
log.info("认证模式实例化-RestHighLevelClient hosts:"+esClusterIp+",userName:"+esUsername+",pwd:"+esPwd);
// 创建 HttpHost 数组
HttpHost[] httpHosts = new HttpHost[addresses.length];
for (int i = 0; i < addresses.length; i++) {
String[] ipPort = addresses[i].split(":");
httpHosts[i] = new HttpHost(ipPort[0], Integer.parseInt(ipPort[1]));
}
// 创建 RestHighLevelClient 客户端
esRestHighLevelClient = new RestHighLevelClient(
RestClient.builder(httpHosts)
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider))
);
return esRestHighLevelClient;
} catch (Throwable e) {
log.error("初始化ES客户端失败:", e);
} finally {
return esRestHighLevelClient;
}
}
}
注意:由于实验了很多es包出现很多冲突,抛出的异常都是Error级别,需要使用Throwable来捕获,用Exection会导致异常被吃掉
4、定义kafka消息数据结构
public class KafkaInputData implements Serializable {
private static final long serialVersionUID = 6966279877727235933L;
/**
* topic
*/
private String topic;
/**
* 分区
*/
private Integer partition;
/**
* 偏移量
*/
private Long offset;
/**
* key
*/
private String key;
/**
* 消息值
*/
private String value;
/**
* 时间戳
*/
private Long timestamp;
/**
* 表名称,包括schema和table名称,字段名要求大写
*/
private String table;
/**
* 操作类型,值有I、U、D、DDL,分别表示该条数据记录是插入、更新、删除和DDL变更,字段名要求大写
*/
private String op_type;
/**
* 数据库操作时间,例:2021-02-04 12:17:49.00006
*/
private String op_ts;
/**
* 发送kafka时间戳,例:1706274886276
*/
private String ts_ms;
/**
* 发送kafka时间,例:2021-02-04 20:16:55.018000
*/
private String current_ts;
/**
* OGG文件记录的偏移量,kafka topic内全局单调递增,例:00000000010000112006
*/
private String pos;
/**
* 源端记录的主键字段名,多个主键字段用逗号隔开,是一个数组,字段名要求大写,例:[\"ID\",\"BI_SFDM\"]
*/
private List<String> primary_keys;
/**
* 主键拼接值
*/
private String primarykeyValue;
/**
* 用于存储源端被变更前的字段值,op_type为U和D时,才存在该key,例:{\"BI_SFDM\":\"03\",\"ID\":6,\"NAME\":\"test01\",\"VALUE\":\"TEST01\"}
*/
private Map<String, Object> before;
/**
* 用于存储源端被变更后的字段值,op_type为I、U、DDL时,才存在该key,例:{\"BI_SFDM\":\"03\",\"ID\":6,\"NAME\":\"test01\",\"VALUE\":\"TEST01\"}
*/
private Map<String, Object> after;
}
5、开发数据写入sink端
public class BaseSinkFunction extends RichSinkFunction<List<KafkaInputData>> {
protected static Logger log = LoggerFactory.getLogger(BaseSinkFunction.class);
/**
* 应用上下文
*/
protected InputStreamSynContext inputStreamSynContext;
/**
* es客户端
*/
protected transient RestHighLevelClient esRestHighLevelClient;
public BaseSinkFunction(InputStreamSynContext inputStreamSynContext) {
this.inputStreamSynContext = inputStreamSynContext;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.esRestHighLevelClient = ElasticsearchUtil.initEsRestHighLevelClient(inputStreamSynContext.isDebug(), inputStreamSynContext.getEsClusterIp(),
inputStreamSynContext.getEsUsername(), inputStreamSynContext.getEsPwd());
}
@Override
public void close() throws Exception {
super.close();
if(null!= this.esRestHighLevelClient){
this.esRestHighLevelClient.close();
}
}
@Override
public void invoke(List<KafkaInputData> kafkaDataList, Context context) throws Exception {
super.invoke(kafkaDataList, context);
long startTime = System.currentTimeMillis();
/************** 先更新自己 **************/
this.updateTable(this.getSinkTable(), kafkaDataList, context);
long endTime = System.currentTimeMillis();
}
public String getSinkTable() {
return "";
}
/**
* 更新某个表的数据
*
* @param tableName
* @param kafkaDataList
* @param context
*/
public void updateTable(String tableName, List<KafkaInputData> kafkaDataList, Context context) {
BulkRequest bulkRequest = new BulkRequest();
try {
long startTime = System.currentTimeMillis();
// 每批处理的数据量
final int batchSize = this.inputStreamSynContext.getEsBathSize();
int totalSize = kafkaDataList.size();
for (int i = 0; i < totalSize; i += batchSize) {
// 计算当前批次的结束索引,确保不会超出列表的大小
int bathEnd = Math.min(i + batchSize, totalSize);
List<KafkaInputData> batchKafkaDataList = kafkaDataList.subList(i, bathEnd);
for (KafkaInputData KafkaInputData : batchKafkaDataList) {
if (null == KafkaInputData || StringUtils.isEmpty(KafkaInputData.getOp_type())) {
return;
}
// 根据数据类型进行增删改操作
String primarykeyValue = KafkaInputData.getPrimarykeyValue();
Map<String, Object> data = KafkaInputData.getAfter();
log.info("获取到的data内容:"+JsonUtil.toJson(data));
switch (EnumOpType.of(KafkaInputData.getOp_type())) {
case INSERT: // 添加文档指定文档ID, 如果存在则替换,否则添加
IndexRequest indexRequest = new IndexRequest(tableName,"_doc").id(primarykeyValue).source(data);
bulkRequest.add(indexRequest);
break;
case DELETE: // 按文档ID删除文档
DeleteRequest deleteRequest = new DeleteRequest(tableName, "_doc",primarykeyValue);
bulkRequest.add(deleteRequest);
break;
case UPDATE: // 按文档ID更新文档,可以指定更新的字段是哪些
UpdateRequest updateRequest = new UpdateRequest(tableName,"_doc", primarykeyValue).doc(data);
bulkRequest.add(updateRequest);
break;
default:
break;
}
}
// 设置刷新策略为等待, 会阻塞当前线程的后续代码执行, 直到所有更改对搜索可见
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
BulkResponse bulkResponse = this.esRestHighLevelClient.bulk(bulkRequest, new BasicHeader("Content-Type", "application/json"));
if (bulkResponse.hasFailures()) {
log.error("更新表 {} 的数据, 批量操作失败: {}", tableName, bulkResponse.buildFailureMessage());
} else {
log.info("更新表 {} 的数据, 批量操作成功", tableName);
}
}
} catch (Throwable e) {
log.error("批量操作数据到es失败: 索引名称:"+tableName+",错误原因:", e);
}
}
}
注意:
- RestHighLevelClient继承了Closeable,无法被序列化,需要定义成transient
- ES6客户端往ES7服务器上面写数据,由于es7已经去掉了多类型,需要将IndexRequest里面的type指定成唯一"_doc"值
6、从kafka读取数据
protected DataStream<String> source() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, StringUtils.defaultString(offset, "latest"));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol",securityProtocol);
properties.put("sasl.mechanism", saslMechanism);
properties.put("sasl.tbds.secure.id",saslTbdsSecureId);
properties.put("sasl.tbds.secure.key", saslTbdsSecureKey);
FlinkKafkaConsumer010<String> flinkKafkaConsumer = new FlinkKafkaConsumer010(
Pattern.compile(this.params.get(AppBaseConstant.GLOBAL_KAFKA_TOPICS)),
new SimpleStringSchema(), properties);
return streamExecutionEnv.addSource(flinkKafkaConsumer)
.setParallelism(this.params.getInt(AppBaseConstant.GLOBAL_PARALLELISM, 1))
.name("从kafka消费数据");
}
注意:FlinkKafkaConsumer010需要与引入flink-connector-kafka引入的pom类保持一致
7、转换kafka数据
protected SingleOutputStreamOperator<KafkaInputData> transform(
final DataStream<String> dataStream) {
SingleOutputStreamOperator<KafkaInputData> transformDataStream = dataStream
.map(new MapFunction<String, KafkaInputData>() {
@Override
public KafkaInputData map(String s) throws Exception {
return JsonUtil.readValue(s, KafkaInputData.class);
}
}).map(new DataMapFunction()).name("并行:网省编码、小写转换、字段过滤、类型转换、字段替换")
.setParallelism(this.inputStreamSynContext.getParallelism());
return transformDataStream;
}
8、写入数据
protected void sink(SingleOutputStreamOperator<KafkaInputData> dataStream) {
SingleOutputStreamOperator<List<KafkaInputData>> windowStream = dataStream.
map(new MapFunction<KafkaInputData, List<KafkaInputData>>() {
private static final long serialVersionUID = -8093040450345066252L;
@Override
public List<KafkaInputData> map(KafkaInputData kafkaInputData) throws Exception {
List<KafkaInputData> dataList = new ArrayList<>();
dataList.add(kafkaInputData);
return dataList;
}
}).setParallelism(this.inputStreamSynContext.getParallelism());
windowStream.addSink(SinkFunctionFactory.create(inputStreamSynContext))
.setParallelism(this.inputStreamSynContext.getParallelism())
.name("Sink写入ES");
}
猜你喜欢
- 2025-01-16 jackson学习之九:springboot整合(配置文件)
- 2025-01-16 流水号设计及Leaf的升级使用
- 2025-01-16 精通Spring Boot 3 : 10. 使用 Spring Boot 进行消息通信 (3)
- 2025-01-16 Java开发学习——SpringMVC之Rest风格解析及快速开发
- 2025-01-16 看了本文你再说不会Redis集群搭建配置,不会Spring 集成Redis看看
- 2025-01-16 话说关于 ZooKeeper 方面,面试有什么可问的啊?
- 2025-01-16 Elasticsearch 保姆级入门篇
- 2025-01-16 Maven最佳实践,程序员必知必会
- 2025-01-16 SpringBoot 默认json解析器详解和字段序列化自定义
- 2025-01-16 怎样在JAVA中解析JSON
你 发表评论:
欢迎- 368℃用AI Agent治理微服务的复杂性问题|QCon
- 363℃手把手教程「JavaWeb」优雅的SpringMvc+Mybatis整合之路
- 358℃初次使用IntelliJ IDEA新建Maven项目
- 351℃Maven技术方案最全手册(mavena)
- 348℃安利Touch Bar 专属应用,让闲置的Touch Bar活跃起来!
- 347℃InfoQ 2024 年趋势报告:架构篇(infoq+2024+年趋势报告:架构篇分析)
- 345℃IntelliJ IDEA 2018版本和2022版本创建 Maven 项目对比
- 343℃从头搭建 IntelliJ IDEA 环境(intellij idea建包)
- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)