专业的编程技术博客社区

网站首页 > 博客文章 正文

Flink低版本往ES7写数据

baijin 2025-01-16 17:48:27 博客文章 9 ℃ 0 评论

由于项目环境限制,flink1.7.2版本比较来老,没有自带flink es7的conector只有6版本的,需要使用RestHighLevelClient集成并实现数据处理,遇到很多坑,这里记录一下解决过程

1、组件版本

组件

版本

jdk

1.8

flink

1.7.2

kafka

1.0.0

elasticsearch

7.7.0

2、pom依赖

<dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.22</version>

    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
      <version>1.7.2</version>
      <exclusions>
        <!-- 需要排除掉log4j,避免与flink框架日志冲突导致日志无法再flink manager无法正常打印-->
        <exclusion>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
<dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.83</version>
    </dependency>
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.7.22</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.9.9</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.9</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.9.9</version>
    </dependency>

3、开发es客户端链接工具类

public class ElasticsearchUtil {

    public static final Logger log = LoggerFactory.getLogger(ElasticsearchUtil.class);

    public static RestHighLevelClient initEsRestHighLevelClient(boolean debug,String esClusterIp,String esUsername,String esPwd) {
        RestHighLevelClient esRestHighLevelClient = null;
        try {
			// 设置账号和密码
			final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY,
					new UsernamePasswordCredentials(esUsername, esPwd));

			// 分割 esClusterIp 字符串,获取单个地址
			String[] addresses = esClusterIp.split(",");
			log.info("认证模式实例化-RestHighLevelClient hosts:"+esClusterIp+",userName:"+esUsername+",pwd:"+esPwd);
			// 创建 HttpHost 数组
			HttpHost[] httpHosts = new HttpHost[addresses.length];
			for (int i = 0; i < addresses.length; i++) {
				String[] ipPort = addresses[i].split(":");
				httpHosts[i] = new HttpHost(ipPort[0], Integer.parseInt(ipPort[1]));
			}

			// 创建 RestHighLevelClient 客户端
			esRestHighLevelClient = new RestHighLevelClient(
					RestClient.builder(httpHosts)
							.setHttpClientConfigCallback(httpClientBuilder ->
									httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider))
			);
            return esRestHighLevelClient;
        } catch (Throwable e) {
            log.error("初始化ES客户端失败:", e);
        } finally {
            return esRestHighLevelClient;
        }
    }

}

注意:由于实验了很多es包出现很多冲突,抛出的异常都是Error级别,需要使用Throwable来捕获,用Exection会导致异常被吃掉

4、定义kafka消息数据结构

public class KafkaInputData implements Serializable {
	
	private static final long serialVersionUID = 6966279877727235933L;

	/**
     * topic
     */
    private String topic;

    /**
     * 分区
     */
    private Integer partition;

    /**
     * 偏移量
     */
    private Long offset;

    /**
     * key
     */
    private String key;

    /**
     * 消息值
     */
    private String value;

    /**
     * 时间戳
     */
    private Long timestamp;

/**
     * 表名称,包括schema和table名称,字段名要求大写
     */
    private String table;

    /**
     * 操作类型,值有I、U、D、DDL,分别表示该条数据记录是插入、更新、删除和DDL变更,字段名要求大写
     */
    private String op_type;

    /**
     * 数据库操作时间,例:2021-02-04 12:17:49.00006
     */
    private String op_ts;

   /**
    * 发送kafka时间戳,例:1706274886276
   */
   private String ts_ms;

    /**
     * 发送kafka时间,例:2021-02-04 20:16:55.018000
     */
    private String current_ts;

    /**
     * OGG文件记录的偏移量,kafka topic内全局单调递增,例:00000000010000112006
     */
    private String pos;

    /**
     * 源端记录的主键字段名,多个主键字段用逗号隔开,是一个数组,字段名要求大写,例:[\"ID\",\"BI_SFDM\"]
     */
    private List<String> primary_keys;
    /**
     * 主键拼接值
     */
    private String primarykeyValue;
    /**
     * 用于存储源端被变更前的字段值,op_type为U和D时,才存在该key,例:{\"BI_SFDM\":\"03\",\"ID\":6,\"NAME\":\"test01\",\"VALUE\":\"TEST01\"}
     */
    private Map<String, Object> before;

    /**
     * 用于存储源端被变更后的字段值,op_type为I、U、DDL时,才存在该key,例:{\"BI_SFDM\":\"03\",\"ID\":6,\"NAME\":\"test01\",\"VALUE\":\"TEST01\"}
     */
    private Map<String, Object> after;

}

5、开发数据写入sink端

public class BaseSinkFunction  extends RichSinkFunction<List<KafkaInputData>> {

	protected static Logger log = LoggerFactory.getLogger(BaseSinkFunction.class);

    /**
     * 应用上下文
     */
    protected InputStreamSynContext inputStreamSynContext;
    /**
     * es客户端
     */
    protected transient RestHighLevelClient esRestHighLevelClient;

    public BaseSinkFunction(InputStreamSynContext inputStreamSynContext) {
        this.inputStreamSynContext = inputStreamSynContext;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.esRestHighLevelClient = ElasticsearchUtil.initEsRestHighLevelClient(inputStreamSynContext.isDebug(), inputStreamSynContext.getEsClusterIp(),
                inputStreamSynContext.getEsUsername(), inputStreamSynContext.getEsPwd());
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(null!= this.esRestHighLevelClient){
            this.esRestHighLevelClient.close();
        }
    }

    @Override
    public void invoke(List<KafkaInputData> kafkaDataList, Context context) throws Exception {
        super.invoke(kafkaDataList, context);

        long startTime = System.currentTimeMillis();

        /************** 先更新自己 **************/
        this.updateTable(this.getSinkTable(), kafkaDataList, context);

        long endTime = System.currentTimeMillis();
    }

    public String getSinkTable() {
        return "";
    }

    /**
     * 更新某个表的数据
     *
     * @param tableName
     * @param kafkaDataList
     * @param context
     */
    public void updateTable(String tableName, List<KafkaInputData> kafkaDataList, Context context) {
        BulkRequest bulkRequest = new BulkRequest();
      try {
            long startTime = System.currentTimeMillis();
            // 每批处理的数据量
            final int batchSize = this.inputStreamSynContext.getEsBathSize();
            int totalSize = kafkaDataList.size();
            for (int i = 0; i < totalSize; i += batchSize) {
                // 计算当前批次的结束索引,确保不会超出列表的大小
                int bathEnd = Math.min(i + batchSize, totalSize);
                List<KafkaInputData> batchKafkaDataList = kafkaDataList.subList(i, bathEnd);
                for (KafkaInputData KafkaInputData : batchKafkaDataList) {
                    if (null == KafkaInputData || StringUtils.isEmpty(KafkaInputData.getOp_type())) {
                        return;
                    }
                    //  根据数据类型进行增删改操作
                    String primarykeyValue = KafkaInputData.getPrimarykeyValue();
                    Map<String, Object> data = KafkaInputData.getAfter();
                    log.info("获取到的data内容:"+JsonUtil.toJson(data));
                    switch (EnumOpType.of(KafkaInputData.getOp_type())) {
                        case INSERT: //  添加文档指定文档ID, 如果存在则替换,否则添加
                            IndexRequest indexRequest = new IndexRequest(tableName,"_doc").id(primarykeyValue).source(data);
                            bulkRequest.add(indexRequest);
                            break;
                        case DELETE: //  按文档ID删除文档
                            DeleteRequest deleteRequest = new DeleteRequest(tableName, "_doc",primarykeyValue);
                            bulkRequest.add(deleteRequest);
                            break;
                        case UPDATE: //  按文档ID更新文档,可以指定更新的字段是哪些
                            UpdateRequest updateRequest = new UpdateRequest(tableName,"_doc", primarykeyValue).doc(data);
                            bulkRequest.add(updateRequest);
                            break;
                        default:
                            break;
                    }
                }

                // 设置刷新策略为等待, 会阻塞当前线程的后续代码执行, 直到所有更改对搜索可见
                bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
                BulkResponse bulkResponse = this.esRestHighLevelClient.bulk(bulkRequest, new BasicHeader("Content-Type", "application/json"));

                if (bulkResponse.hasFailures()) {
                    log.error("更新表 {} 的数据, 批量操作失败: {}", tableName, bulkResponse.buildFailureMessage());
                } else {
                    log.info("更新表 {} 的数据,  批量操作成功", tableName);
                }
            }

        } catch (Throwable e) {
            log.error("批量操作数据到es失败: 索引名称:"+tableName+",错误原因:", e);
        }
    }
}

注意:

  1. RestHighLevelClient继承了Closeable,无法被序列化,需要定义成transient
  2. ES6客户端往ES7服务器上面写数据,由于es7已经去掉了多类型,需要将IndexRequest里面的type指定成唯一"_doc"值

6、从kafka读取数据

protected DataStream<String> source() {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, StringUtils.defaultString(offset, "latest"));
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("security.protocol",securityProtocol);
    properties.put("sasl.mechanism", saslMechanism);
    properties.put("sasl.tbds.secure.id",saslTbdsSecureId);
    properties.put("sasl.tbds.secure.key", saslTbdsSecureKey);

    FlinkKafkaConsumer010<String> flinkKafkaConsumer = new FlinkKafkaConsumer010(
        Pattern.compile(this.params.get(AppBaseConstant.GLOBAL_KAFKA_TOPICS)),
        new SimpleStringSchema(), properties);

    return streamExecutionEnv.addSource(flinkKafkaConsumer)
        .setParallelism(this.params.getInt(AppBaseConstant.GLOBAL_PARALLELISM, 1))
        .name("从kafka消费数据");
  }

注意:FlinkKafkaConsumer010需要与引入flink-connector-kafka引入的pom类保持一致

7、转换kafka数据

  protected SingleOutputStreamOperator<KafkaInputData> transform(
      final DataStream<String> dataStream) {

    SingleOutputStreamOperator<KafkaInputData> transformDataStream = dataStream
        .map(new MapFunction<String, KafkaInputData>() {
          @Override
          public KafkaInputData map(String s) throws Exception {
            return JsonUtil.readValue(s, KafkaInputData.class);
          }
        }).map(new DataMapFunction()).name("并行:网省编码、小写转换、字段过滤、类型转换、字段替换")
        .setParallelism(this.inputStreamSynContext.getParallelism());
    return transformDataStream;
  }

8、写入数据

protected void sink(SingleOutputStreamOperator<KafkaInputData> dataStream) {
    SingleOutputStreamOperator<List<KafkaInputData>> windowStream = dataStream.
        map(new MapFunction<KafkaInputData, List<KafkaInputData>>() {
          private static final long serialVersionUID = -8093040450345066252L;

          @Override
          public List<KafkaInputData> map(KafkaInputData kafkaInputData) throws Exception {
            List<KafkaInputData> dataList = new ArrayList<>();
            dataList.add(kafkaInputData);
            return dataList;
          }
        }).setParallelism(this.inputStreamSynContext.getParallelism());
    windowStream.addSink(SinkFunctionFactory.create(inputStreamSynContext))
        .setParallelism(this.inputStreamSynContext.getParallelism())
        .name("Sink写入ES");
  }

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表