专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊flink Table的ScalarFunction

baijin 2024-10-03 17:36:35 博客文章 8 ℃ 0 评论

本文主要研究一下flink Table的ScalarFunction

实例

public class HashCode extends ScalarFunction {
?
 private int factor = 0;
?
 @Override
 public void open(FunctionContext context) throws Exception {
 // access "hashcode_factor" parameter
 // "12" would be the default value if parameter does not exist
 factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
 }
?
 public int eval(String s) {
 return s.hashCode() * factor;
 }
}
?
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
?
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
?
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
?
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
?
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode继承了ScalarFunction,它定义了eval方法

ScalarFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

abstract class ScalarFunction extends UserDefinedFunction {
?
 /**
 * Creates a call to a [[ScalarFunction]] in Scala Table API.
 *
 * @param params actual parameters of function
 * @return [[Expression]] in form of a [[ScalarFunctionCall]]
 */
 final def apply(params: Expression*): Expression = {
 ScalarFunctionCall(this, params)
 }
?
 // ----------------------------------------------------------------------------------------------
?
 /**
 * Returns the result type of the evaluation method with a given signature.
 *
 * This method needs to be overridden in case Flink's type extraction facilities are not
 * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
 * method. Flink's type extraction facilities can handle basic types or
 * simple POJOs but might be wrong for more complex, custom, or composite types.
 *
 * @param signature signature of the method the return type needs to be determined
 * @return [[TypeInformation]] of result type or null if Flink should determine the type
 */
 def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
?
 /**
 * Returns [[TypeInformation]] about the operands of the evaluation method with a given
 * signature.
 *
 * In order to perform operand type inference in SQL (especially when NULL is used) it might be
 * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
 * By default Flink's type extraction facilities are used for this but might be wrong for
 * more complex, custom, or composite types.
 *
 * @param signature signature of the method the operand types need to be determined
 * @return [[TypeInformation]] of operand types
 */
 def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
 signature.map { c =>
 try {
 TypeExtractor.getForClass(c)
 } catch {
 case ite: InvalidTypesException =>
 throw new ValidationException(
 s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
 s"automatically determined. Please provide type information manually.")
 }
 }
 }
}
  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

class CRowProcessRunner(
 name: String,
 code: String,
 @transient var returnType: TypeInformation[CRow])
 extends ProcessFunction[CRow, CRow]
 with ResultTypeQueryable[CRow]
 with Compiler[ProcessFunction[Row, Row]]
 with Logging {
?
 private var function: ProcessFunction[Row, Row] = _
 private var cRowWrapper: CRowWrappingCollector = _
?
 override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating ProcessFunction.")
 function = clazz.newInstance()
 FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 FunctionUtils.openFunction(function, parameters)
?
 this.cRowWrapper = new CRowWrappingCollector()
 }
?
 override def processElement(
 in: CRow,
 ctx: ProcessFunction[CRow, CRow]#Context,
 out: Collector[CRow])
 : Unit = {
?
 cRowWrapper.out = out
 cRowWrapper.setChange(in.change)
 function.processElement(
 in.row,
 ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
 cRowWrapper)
 }
?
 override def getProducedType: TypeInformation[CRow] = returnType
?
 override def close(): Unit = {
 FunctionUtils.closeFunction(function)
 }
}
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
?
 private static final long serialVersionUID = 1L;
?
 /**
 * Process one element from the input stream.
 *
 * <p>This function can output zero or more elements using the {@link Collector} parameter
 * and also update internal state or set timers using the {@link Context} parameter.
 *
 * @param value The input value.
 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
 * a {@link TimerService} for registering timers and querying the time. The
 * context is only valid during the invocation of this method, do not store it.
 * @param out The collector for returning result values.
 *
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 * to fail and may trigger recovery.
 */
 public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
?
 /**
 * Called when a timer set using {@link TimerService} fires.
 *
 * @param timestamp The timestamp of the firing timer.
 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
 * querying the {@link TimeDomain} of the firing timer and getting a
 * {@link TimerService} for registering timers and querying the time.
 * The context is only valid during the invocation of this method, do not store it.
 * @param out The collector for returning result values.
 *
 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 * to fail and may trigger recovery.
 */
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
?
 /**
 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
 * or {@link #onTimer(long, OnTimerContext, Collector)}.
 */
 public abstract class Context {
?
 /**
 * Timestamp of the element currently being processed or timestamp of a firing timer.
 *
 * <p>This might be {@code null}, for example if the time characteristic of your program
 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
 */
 public abstract Long timestamp();
?
 /**
 * A {@link TimerService} for querying time and registering timers.
 */
 public abstract TimerService timerService();
?
 /**
 * Emits a record to the side output identified by the {@link OutputTag}.
 *
 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
 * @param value The record to emit.
 */
 public abstract <X> void output(OutputTag<X> outputTag, X value);
 }
?
 /**
 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 */
 public abstract class OnTimerContext extends Context {
 /**
 * The {@link TimeDomain} of the firing timer.
 */
 public abstract TimeDomain timeDomain();
 }
?
}
  • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

class DataStreamCalc(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 input: RelNode,
 inputSchema: RowSchema,
 schema: RowSchema,
 calcProgram: RexProgram,
 ruleDescription: String)
 extends Calc(cluster, traitSet, input, calcProgram)
 with CommonCalc
 with DataStreamRel {
?
 //......
?
 override def translateToPlan(
 tableEnv: StreamTableEnvironment,
 queryConfig: StreamQueryConfig): DataStream[CRow] = {
?
 val config = tableEnv.getConfig
?
 val inputDataStream =
 getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
?
 // materialize time attributes in condition
 val condition = if (calcProgram.getCondition != null) {
 val materializedCondition = RelTimeIndicatorConverter.convertExpression(
 calcProgram.expandLocalRef(calcProgram.getCondition),
 inputSchema.relDataType,
 cluster.getRexBuilder)
 Some(materializedCondition)
 } else {
 None
 }
?
 // filter out time attributes
 val projection = calcProgram.getProjectList.asScala
 .map(calcProgram.expandLocalRef)
?
 val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)
?
 val genFunction = generateFunction(
 generator,
 ruleDescription,
 inputSchema,
 schema,
 projection,
 condition,
 config,
 classOf[ProcessFunction[CRow, CRow]])
?
 val inputParallelism = inputDataStream.getParallelism
?
 val processFunc = new CRowProcessRunner(
 genFunction.name,
 genFunction.code,
 CRowTypeInfo(schema.typeInfo))
?
 inputDataStream
 .process(processFunc)
 .name(calcOpName(calcProgram, getExpressionString))
 // keep parallelism to ensure order of accumulate and retract messages
 .setParallelism(inputParallelism)
 }
}
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

小结

  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

doc

  • Integrating UDFs with the Runtime

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表