网站首页 > 博客文章 正文
序
本文主要研究一下flink Table的ScalarFunction
实例
public class HashCode extends ScalarFunction { ? private int factor = 0; ? @Override public void open(FunctionContext context) throws Exception { // access "hashcode_factor" parameter // "12" would be the default value if parameter does not exist factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); } ? public int eval(String s) { return s.hashCode() * factor; } } ? ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); ? // set job parameter Configuration conf = new Configuration(); conf.setString("hashcode_factor", "31"); env.getConfig().setGlobalJobParameters(conf); ? // register the function tableEnv.registerFunction("hashCode", new HashCode()); ? // use the function in Java Table API myTable.select("string, string.hashCode(), hashCode(string)"); ? // use the function in SQL tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
- HashCode继承了ScalarFunction,它定义了eval方法
ScalarFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala
abstract class ScalarFunction extends UserDefinedFunction { ? /** * Creates a call to a [[ScalarFunction]] in Scala Table API. * * @param params actual parameters of function * @return [[Expression]] in form of a [[ScalarFunctionCall]] */ final def apply(params: Expression*): Expression = { ScalarFunctionCall(this, params) } ? // ---------------------------------------------------------------------------------------------- ? /** * Returns the result type of the evaluation method with a given signature. * * This method needs to be overridden in case Flink's type extraction facilities are not * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * * @param signature signature of the method the return type needs to be determined * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null ? /** * Returns [[TypeInformation]] about the operands of the evaluation method with a given * signature. * * In order to perform operand type inference in SQL (especially when NULL is used) it might be * necessary to determine the parameter [[TypeInformation]] of an evaluation method. * By default Flink's type extraction facilities are used for this but might be wrong for * more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return [[TypeInformation]] of operand types */ def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { signature.map { c => try { TypeExtractor.getForClass(c) } catch { case ite: InvalidTypesException => throw new ValidationException( s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " + s"automatically determined. Please provide type information manually.") } } } }
- ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
CRowProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala
class CRowProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[ProcessFunction[Row, Row]] with Logging { ? private var function: ProcessFunction[Row, Row] = _ private var cRowWrapper: CRowWrappingCollector = _ ? override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating ProcessFunction.") function = clazz.newInstance() FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(function, parameters) ? this.cRowWrapper = new CRowWrappingCollector() } ? override def processElement( in: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]) : Unit = { ? cRowWrapper.out = out cRowWrapper.setChange(in.change) function.processElement( in.row, ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], cRowWrapper) } ? override def getProducedType: TypeInformation[CRow] = returnType ? override def close(): Unit = { FunctionUtils.closeFunction(function) } }
- CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成
ProcessFunction
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java
@PublicEvolving public abstract class ProcessFunction<I, O> extends AbstractRichFunction { ? private static final long serialVersionUID = 1L; ? /** * Process one element from the input stream. * * <p>This function can output zero or more elements using the {@link Collector} parameter * and also update internal state or set timers using the {@link Context} parameter. * * @param value The input value. * @param ctx A {@link Context} that allows querying the timestamp of the element and getting * a {@link TimerService} for registering timers and querying the time. The * context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; ? /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, * querying the {@link TimeDomain} of the firing timer and getting a * {@link TimerService} for registering timers and querying the time. * The context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} ? /** * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class Context { ? /** * Timestamp of the element currently being processed or timestamp of a firing timer. * * <p>This might be {@code null}, for example if the time characteristic of your program * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. */ public abstract Long timestamp(); ? /** * A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); ? /** * Emits a record to the side output identified by the {@link OutputTag}. * * @param outputTag the {@code OutputTag} that identifies the side output to emit to. * @param value The record to emit. */ public abstract <X> void output(OutputTag<X> outputTag, X value); } ? /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); } ? }
- ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
DataStreamCalc
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
class DataStreamCalc( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, inputSchema: RowSchema, schema: RowSchema, calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc with DataStreamRel { ? //...... ? override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { ? val config = tableEnv.getConfig ? val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) ? // materialize time attributes in condition val condition = if (calcProgram.getCondition != null) { val materializedCondition = RelTimeIndicatorConverter.convertExpression( calcProgram.expandLocalRef(calcProgram.getCondition), inputSchema.relDataType, cluster.getRexBuilder) Some(materializedCondition) } else { None } ? // filter out time attributes val projection = calcProgram.getProjectList.asScala .map(calcProgram.expandLocalRef) ? val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo) ? val genFunction = generateFunction( generator, ruleDescription, inputSchema, schema, projection, condition, config, classOf[ProcessFunction[CRow, CRow]]) ? val inputParallelism = inputDataStream.getParallelism ? val processFunc = new CRowProcessRunner( genFunction.name, genFunction.code, CRowTypeInfo(schema.typeInfo)) ? inputDataStream .process(processFunc) .name(calcOpName(calcProgram, getExpressionString)) // keep parallelism to ensure order of accumulate and retract messages .setParallelism(inputParallelism) } }
- DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法
小结
- ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
- CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
- DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法
doc
- Integrating UDFs with the Runtime
猜你喜欢
- 2024-10-03 FlinkSQL全面指南(flinksql udf)
- 2024-10-03 Apache Flink学习笔记(六)Table API
- 2024-10-03 Flink Table API & SQL 聚合性能调优
- 2024-10-03 美团点评基于 Flink 的实时数仓平台实践
- 2024-10-03 Flink SQL 动态表 & 连续查询详解(建议收藏)
- 2024-10-03 Flink 1.11 与 Hive 批流一体数仓实践
- 2024-10-03 Flink SQL中的动态表和临时表(flink sql动态查询)
- 2024-10-03 三分钟速懂大数据Flink | 窗口操作
- 2024-10-03 大数据_Flink_Java版_Table API 和 Flink SQL(1)_基本介绍和简单示例
- 2024-10-03 新一代大数据计算引擎 Flink从入门到实战
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)