标签

 flink 

相关的文章:

本列表页提供了关于 Flink 实时数据处理与优化实践的文章,包括构建实时数仓、优化 Flink Keyed State、流批一体应用、高效接入 Flink 等内容。

Reading《Stream Processing with Apache Flink》-2nd

原文英文,约1700词,阅读约需7分钟。发表于:

1 Chapter7: Stateful Operators And Applications 1.1 Implementing Stateful Functions Keyed State: ValueState[T]: single value. The value can be read using ValueState.value() and updated with ValueState.update(value: T) ListState[T]: list of elements. 常用接口有add addAll get update MapState[K, V]: map of keys and values. get put contains remove ReducingState[T]: 类似 ListState, 但是不存储全部 list,而是 immediately aggregates value using a ReduceFunction AggregatingState[I, O]: 类似 reduce 和 aggregate 的关系,更加通用化 val sensorData: DataStream[SensorReading] = ??? // partition and key the stream on the sensor ID val keyedData: KeyedStream[SensorReading, String] = sensorData .keyBy(_.id) // apply a stateful FlatMapFunction on the keyed stream which // compares the temperature readings and raises alerts val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { // the state handle object private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { // create state descriptor val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) // obtain the state handle lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = { // fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { // temperature changed by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } // update lastTemp state this.lastTempState.update(reading.temperature) } } State 存储时使用 Flink 的TypeInformation(序列化、反序列化) StateDescriptor 是函数从 StateBackend 获取/注册 State 的描述符 Operator List State: 可以继承 public interface ListCheckpointed<T extends Serializable> { List<T> snapshotState(long checkpointId, long timestamp) throws Exception; void restoreState(List<T> state) throws Exception; } 注意:该接口已经标记 @Deprecated, 建议使用 CheckpointedFunction Broadcast State: 典型的场景是:a stream of rules and a stream of events on which the rules are applied, 即 事件流 和 规则流。 val sensorData: DataStream[SensorReading] = ??? val thresholds: DataStream[ThresholdUpdate] = ??? val keyedSensorData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) // the descriptor of the broadcast state val broadcastStateDescriptor = new MapStateDescriptor[String, Double]( "thresholds", classOf[String], classOf[Double]) val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds .broadcast(broadcastStateDescriptor) // connect keyed sensor stream and broadcasted rules stream val alerts: DataStream[(String, Double, Double)] = keyedSensorData .connect(broadcastThresholds) .process(new UpdatableTemperatureAlertFunction()) 注意 Broadcast events 可能乱序。 CheckpointedFunction, CheckpointListener跟 checkpoint 紧密相关,前者在触发 checkpoint 时调用,可以定义各类 State,例如ValueState ListState等,后者则注册了 checkpoint 完成时的回调。 1.2 Enabling Failure Recovery for Stateful Applications 1.3 Ensuring the Maintainability of Stateful Applications 任务会经常变动:Bugs need to be fixed, functionality adjusted, added, or removed, or the parallelism of the operator needs to be adjusted to account for higher or lower data rates. 为了确保任务的可维护性,关于 state 有两点需要注意: Specifying Unique Operator Identifiers : 最好从程序开始就为每个 operator 指定 Defining the Maximum Parallelism of Keyed State Operators: setMaxParallelism在这里更确切的作用是setCountOfKeyGroups 1.4 Performance and Robustness of Stateful Applications StateBackend: MemoryStateBackend, the FsStateBackend, and the RocksDBStateBackend. 使用 RocksDBStateBackend 时,不同 State 类型性能差别较大。比如 MapState[X, Y]比ValueState[HashMap[X, Y]]性能更高,ListState[X]比ValueState[List[X]]更适合频繁追加数据的场景。 滥用 state 会导致 state 过大的问题,比如 KeyedStream.aggregate 而 key 无限制,典型的比如统计用户行为时的 sessionId. 使用 timer 清理 state,确保 state 不会引发问题。例如: class SelfCleaningTemperatureAlertFunction(val threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { // the keyed state handle for the last temperature private var lastTempState: ValueState[Double] = _ // the keyed state handle for the last registered timer private var lastTimerState: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { // register state for last temperature val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) // register state for last timer val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long]) lastTimerState = getRuntimeContext.getState(timestampDescriptor) } override def processElement( reading: SensorReading, ctx: KeyedProcessFunction [String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { // compute timestamp of new clean up timer as record timestamp + one hour val newTimer = ctx.timestamp() + (3600 * 1000) // get timestamp of current timer val curTimer = lastTimerState.value() // delete previous timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimer) ctx.timerService().registerEventTimeTimer(newTimer) // update timer timestamp state lastTimerState.update(newTimer) // fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { // temperature increased by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } // update lastTemp state this.lastTempState.update(reading.temperature) } override def onTimer( timestamp: Long, ctx: KeyedProcessFunction [String, SensorReading, (String, Double, Double)]#OnTimerContext, out: Collector[(String, Double, Double)]): Unit = { // clear all state for the key lastTempState.clear() lastTimerState.clear() } } 1.5 Evolving Stateful Applications Updating an Application without Modifying Existing State: compatible Changing the Input Data Type of Built-in Stateful Operators: not compatible Removing State from an Application: 默认 avoid losing state,可以关闭 Modifying the State of an Operator: 比如ValueState[String]修改为ValueState[Double],兼容不全,尽量避免。 1.6 Queryable State 支持 state 的点查和读取,依赖 flink-queryable-state-client-java. 注:之前调研 flink 时,这个功能看上去非常强大,不过目前在官网已经看不到相关文档了。 2 Chapter8: Reading From and Write to External Systems 2.1 Application Consistency Guarantees 如果想不丢数据,source 需要是 resettable 的,例如读文件时 File ByteStream 的 offset,读 kafka 时 TopicPartition 的 offset. 但是如果想要 end-to-end exactly-once, sink connectors 还需要支持 idempotent writes or transactional writes. 后者比如 write-ahead-log (WAL) sink , two-phase-commit (2PC) sink.   Nonresettable source Resettable source Any Sink At-most-once At-least-once Idempotent sink At-most-once Exactly-once*(temporary inconsistencies during recovery) WAL sink At-most-once At-least-once 2PC sink At-most-once Exactly-once 注意 WAL sink 即使仅在 checkpoint complete 完成的时候 sink,也无法作答 Exactly-once.s 2.2 Provided Connectors Kafka, Filesystem, etc. 官网比书里已经更详细了。 2.3 Implementing a Custom Source Function SourceFunction and RichSourceFunction can be used to define nonparallel source connectors—sources that run with a single task. ParallelSourceFunction and RichParallelSourceFunction can be used to define source connectors that run with multiple parallel task instances. 注:接口后来有变化 当 checkpoint 进行的时候,需要记录此时的 offset, 就需要避免SourceFunction.run()emit data. 换句话说CheckpointedFunction.snapshotState和该方法,只能同时在执行一个。 需要注意 sourceFunction 某个 parallelism idle 时不会发出 watermark,可能导致整个任务在等待的情形。 2.4 Implementing a Custom Sink Function Idempotent Sink Connectors: 要求结果数据有 deterministic (composite) key,存储支持 Transactional Sink Connectors: GenericWriteAheadSink: 先写 WAL,收到 CheckpointCompleted 时写入到存储。听上去似乎很完美,但是实际上只能做到 At-least-once,有两种情况:存储的批量写入不是原子的;存储写入成功,但是 commit checkpoint 时失败。 TwoPhaseCommitSinkFunction sink operator 收到 checkpoint barrier:persists its state, prepares the current transaction for committing, and acknowledges the checkpoint at the JobManager. JobManager 收到所有 task instances 的 successful checkpoint notifications sink operator 收到 checkpoint completed 消息:commits all open transactions of previous checkpoints. 我理解 commit 确保了持久化, 如果 commit 失败的话,preCommit 的操作会被回滚,确保不会对 storage system 产生影响,因而保证了 Exactly-once 语义。书里有一个TransactionalFileSink的例子,很直观。当然支持该语义带来的问题也需要注意,一是 checkpoint 完成后数据才可见;二是对 kafka transaction timeout 调优,避免一直 commit 失败导致可能的数据丢失。 2.5 Asynchronously Accessing External Systems 异步查询词典的场景 3 Chapter9: Setting Up Flink for Streaming Applications 3.1 Deployment Modes Standalone Cluster: 启动: 提交: Docker YARN: JobMode: SessionMode: 启动: 提交: 注:ApplicationMode Kubernetes: 生产环境的目标状态应当还是容器化部署 3.2 Highly Available Setups HA 的目的是 as little downtime as possible. TaskManager 失败可以由 ResourceManager 恢复,JobManager 失败则依赖于 HA 部署。 HA 需要考虑的存储有:JAR file, the JobGraph, and pointers to completed checkpoints. 书里介绍了 ZooKeeper HA Services,当前还有 Kubernetes HA Services. 实践经验里看还是有些坑的,尤其是 Yarn 相关参数。 3.3 Integration with Hadoop Components 3.4 Filesystem Configuration 3.5 System Configuration Java and Classloading: Flink 提供了 User-code class loaders, 注意 classloader.resolve-order 相关的配置。 CPU: task 在 TaskManager 的线程运行,以 slot 的方式对外提供。 Main Memory and Network Buffers: JM TM 内存重点不同,额外注意 network buffer 和 rocksdb backend. Disk Storage Checkpointing and State Backends Security 4 Chapter10: Operating Flink and Streaming Applications 4.1 Running and Managing Streaming Applications 这节提到了“maintaining streaming applications is more challenging than maintaining batch applications”,我个人觉得对于 streaming applications,maintaining 比 develop 更具挑战性。maintaining = start, stop, pause and resume, scale, and upgrade. 操作 flink 任务可以使用 CLI 或者 REST API. savepoint 相关功能最好通过uid()定义 Unique Operator IDs. Kubernetes 相关的内容已经过时,建议直接参考文档。 4.2 Controlling Task Scheduling Task Chaining 可以将网络通信转为线程内方法的直接调用,因此 Flink 默认开启,如有必要可以通过disableChaining, startNewChain调优。 Slot-Sharing Groups 允许用户自己协调计算密集和 IO 密集的 task: // slot-sharing group "green" val a: DataStream[A] = env.createInput(...) .slotSharingGroup("green") .setParallelism(4) val b: DataStream[B] = a.map(...) // slot-sharing group "green" is inherited from a .setParallelism(4) // slot-sharing group "yellow" val c: DataStream[C] = env.createInput(...) .slotSharingGroup("yellow") .setParallelism(2) // slot-sharing group "blue" val d: DataStream[D] = b.connect(c.broadcast(...)).process(...) .slotSharingGroup("blue") .setParallelism(4) val e = d.addSink() // slot-sharing group "blue" is inherited from d .setParallelism(2) 如上代码,不同 task 分配的效果: 4.3 Tuning Checkpointing and Recovery 重要配置: 间隔、minPause、超时时间、 程序退出时是否删除 选择合适的 backend Restart strategies 4.4 Monitoring Flink Clusters and Applications Flink WebUI 可以用来初步分析任务:日志、metrics 等。如果要深入分析,则依赖 metrics systems. 4.5 Configuring the Logging Behavior 5 Chapter11: Where to Go from Here? 批处理、TableAPI、SQL、CEP 等

本文介绍了Flink中的有状态操作符和应用程序的实现方法。其中包括键控状态、映射状态、聚合状态等不同类型的状态。文章还讨论了如何实现故障恢复、确保应用程序的可维护性、提高性能和稳定性等方面的内容。此外,还介绍了外部系统的读写和Flink的部署、运行和监控等方面的知识。最后,提到了可查询状态和自定义源和接收器的实现方法。

Reading《Stream Processing with Apache Flink》-2nd
相关推荐 去reddit讨论

Reading《Stream Processing with Apache Flink》-1st

原文约10800字,阅读约需26分钟。发表于:

1 Chapter1: Introduction to Stateful Stream Processing 2 Chapter2: Stream Processing Fundamentals 介绍了 Parallel、Time、State 等概念 Processing Streams in Parallel Latency and Throughput: 延迟、吞吐的关系 Operations on DataStreams: 输入输出、算子、聚合、窗口 Time Semantics: processing time 适合对数据延迟、乱序不敏感的场景;event time 适合对结果要求准确且唯一的场景,引入了 watermark 避免一直等待。 State And Consitency Models: 批的 failover 可以依赖回放数据,但是流不可以;真实世界使用最多的是 At-least-once,如何保证?一个方案是确保保存数据,直到所有 task 都返回了 ACK 3 Chapter3: The Architecture of Apache Flink 3.1 System Architecture Components of a Flink Setup: JobManager(生成和分配ExecutionGraph、任务协调);ResourceManager(跟 resource provider 交互,申请和回收 taskmanager 资源); TaskManager(实际的 worker process);Dispatcher(Rest). 根据环境不同,有的 components 可能跑在一个 JVM Process 上。注意跟现在的已经不一样了 Task Execution: taskmanager 多个 slot,上下游 operator 的 parallelism 不同时,就会发生数据的 exchange. High Available Setup: TaskManager failures: JobManager 跟 ResourceManager 申请新的 slot JobManager failures: 数据持久化到 storage,pointer 存储到 zk,新的 JM 通过 zk 上的 latest complete checkpoint 恢复任务 3.2 Data Transfer In Flink Task Chaining: 用户定义 : chain 为函数间的调用关系 : 有时也会希望在多个线程间执行 : t1=0.1s t2=0.8s t3=0.2s,1 个线程 1qps,因此 10 个线程 10qps;也可以 f1 1 个线程,f2 8个,f3 1个(不过我没想清楚区别在哪) 3.3 Event-Time Processing 相比 Processing-Time 的流系统,实现上更加复杂 Timestamps: 事件的元数据 Watermarks: 如果收到了 T 时间戳的 watermark,则表示 T 之前的数据都已经到达。后续如果有违反该约定的数据,成为 Late Record.在 Handling Late Data 一节分析。合适的 watermark 是在 latency 和 completeness tradeoff. Watermark Propagation and Event Time: 多流的场景,不同流的 watermark 有快有慢,更加复杂 Timestamp Assignment and Watermark Genearation: timestamp、watermark 显示设置,有三种方式: Source Function、AssignerWithPeriodicWatermarks、AssignerWithPunctuatedWatermarks,从 record 提取 timestamp,同时结合配置计算当前的 watermark. 后两者有对应的子类实现。 3.4 State Management: Operator State:,有 ListState、UnionListState、BroadcastState Keyed State:,有 ValueState、ListState、MapState State Backend:state 的读写速度影响 latency Scaling Stateful Operators:,scale out、scale in 都按照 key group,而不是 redistribute. Operator list state : Operator union list state: Operator broadcast state: 3.5 Checkpoints, Savepoints, and State Recovery Consistent Checkpoints: naive mechanism 需要暂停数据输入,待所有 in-flight 的数据都处理完成后再 resume,但是 flink 采用了更 sophisticated 的方法: ,Source 产生 1,2,3,… 的数据,在图中的时刻,checkpoint 记录了 Source offset = 5, 而奇数和偶数的 sum 分别为 9 和 6. Recovery From a Consistent Checkpoint:, task 失败从 checkpoint 恢复时,从 5 之后继续消费,数据是正确且一致的。注意 sink operators 可能收到多条。 Flink’s Checkpointing Algorithm : Flink 没有使用 pause-checkpoint-resume 的做法,而是基于 Chandy-Lamport algorithm for distributed snapshots. 例如这个过程: source 分为两部分,每部分都生成递增的数字,当前状态如图所示: 此时 JobManager 触发 checkpointID=2(三角形): Source 收到后,记录此时 source 的 offset(3, 4),并在当前位置插入 checkpoint barrier(ID=2),跟普通数据一样,发送到下游算子: 下游算子收到后,等待所有上游算子实例的 ID=2 的 barrier:,此时上游算子仍然在产生数据,当前算子也缓存着晚于 barrier 的数据(例如 Source1 产生的蓝色圆圈4) 当所有 ID=2 的 barrier 到达后,该算子也写入 checkpoint 数据(8, 8), 待当前算子发送所有 ID=2 的 barrier 后,处理缓存的数据并发送: 当 sink operators 也 ACK checkpoint 后,就认为 ID=2 的 checkpoint 全部完成 Performance Implications Of Checkpointing: 异步的将 local snapshot to the remote storage;不强制等待 barrier 对齐,而是继续处理并发送数据到下游(代价是恢复时只能 exactly-once,以及随着非对齐增多导致 state 变大?) Savepoints: checkpoints 主要用于失败恢复的场景,但是 consistent snapshots 实际上有更多的用途。Using savepoints:比如 fix bugs and reprocesss 的场景,或者 A/B tests,不过需要 application 前后兼容。修改并发、修改集群、pause-resume. Starting an application from a savepoint : 4 Chapter4: Setting Up a Development Environment for Apache Flink 主要介绍在 IDE 上运行 Flink 任务,注意有些 issue 例如 ClassLoader 跟实际环境是不同的 5 Chapter5: The DataStream API(v1.7) Hello, Flink: 构建一个 flink application 有 5 步: Set Up the Execution Environment Read An Input Stream Apply Transformations Output the result Execute Transformations: Basic Transformations: on individual events, Map/Filter/FlatMap KeyedStream Transformations: in context of a key keyBy: convert DataStream into KeyedStream Rolling aggregations: sum/min/max/minBy/maxBy Reduce MultiStream Transformations: merge into one or split into multiple Union: Connect, coMap, and coFlatMap: DataSteam 的数据是随机处理的,因此 ConnectedStream 常用于两个 KeyedStream、DataStream + Broadcast 以确保结果的确定性,因此用到了 keyedState. Split and select: split 与 union 相反 : ,返回 SplitStream, 通过 select 方法返回不同的 DataStream Distribution Transformations: 普通情况下是由 operation semantics and parallelism 决定的,不过也支持 shuffle/rebalance/rescale(rebalance vs rescale: /broadcast/global/partitionCustom(自定义) Setting the Parallelism: application 和 opertor 级别 Types: 网络传输、读写 statebackend 都会用到 Types; 统一各语言的 type diff,例如 scala 和 java 的 tuple (seg-1)、语言特有的,比如 scala case class Supported Data Types 包括 Primitives、Java and Scala tuples、Scala case classes、POJOS, including classes generated by Apache Arvo、Some special types, 其他类型则 fallback 到 Kryo serialization framework. Creating Type Information for Data Types: type system 的核心类是 TypeInformation,flink 为支持的各种数据类型,都提供了对应的子类实现,例如 NumericTypeInfo 封装了 Integer Long Double Byte Short Float Character 类。 Explicitly Providing Type Information: 自动提取 TypeInformation 失败的场景(例如 java 里的 erasing generic type information),此时就需要显示指定 return 的 TypeInformation 了。 Defining Keys and Referencing Fields : 可以按照 pos、字段名(literal)、KeySelector Implementing Functions : Function 应当是 Serializable,如果存在 non-serializable field,需要 override RichFunction.open 方法 6 Chapter6: Time-Based And Window Operators 6.1 Configuring Time Characteristics 6.1.1 Assigning Timestamp and Generating Watermarks DataStream.assignTimestampsAndWatermarks 的参数类型可以为 AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks,两者都继承自 TimestampAssigner. 其中: TimestampAssigner.extractTimestamp: 定义了提取 timestamp 的接口 Watermark checkAndGetNextWatermark/ Watermark getCurrentWatermark() :定义了提取 watermark 的接口 AssignerWithPeriodicWatermarks: Watermark getCurrentWatermark() ,watermark 跟 timestamp 有关的场景 # timestamp 为 SensorReading.timestamp # watermark 为当前收到的maxTimestamp - 1min # env.getConfig.setAutoWatermarkInterval(5000) => 每 5s 调用一次 getCurrentWatermark 方法 class PeriodicAssigner     extends AssignerWithPeriodicWatermarks[SensorReading] {   val bound: Long = 60 * 1000     // 1 min in ms   var maxTs: Long = Long.MinValue // the maximum observed timestamp   override def getCurrentWatermark: Watermark = {     // generated watermark with 1 min tolerance     new Watermark(maxTs - bound)   }   override def extractTimestamp(       r: SensorReading,       previousTS: Long): Long = {     // update maximum timestamp     maxTs = maxTs.max(r.timestamp)     // return record timestamp     r.timestamp   } } 明确事件时间递增的前提下,简化为 assignAscendingTimeStamps,相当于使用了内置的 AscendingTimestampExactor implements AssignerWithPeriodicWatermarks ;另外一种常用的内置 AssignerWithPeriodicWatermarks 则是 BoundedOutOfOrdernessTimeStampExtractor. AssignerWithPunctuatedWatermarks: Watermark checkAndGetNextWatermark,watermark 跟 event 自身有关的场景 # sensor_1 携带着 watermark class PunctuatedAssigner     extends AssignerWithPunctuatedWatermarks[SensorReading] {   val bound: Long = 60 * 1000 // 1 min in ms   override def checkAndGetNextWatermark(       r: SensorReading,       extractedTS: Long): Watermark = {     if (r.id == "sensor_1") {       // emit watermark if reading is from sensor_1       new Watermark(extractedTS - bound)     } else {       // do not emit a watermark       null     }   }   override def extractTimestamp(       r: SensorReading,       previousTS: Long): Long = {     // assign record timestamp     r.timestamp   } } 6.1.2 Watermarks, Latency and Completeness Watermarks are used to balance latency and result completenes, 6.2 Process Functions 相比之前介绍的 MapFunction,process functions 是一组 low-level transformation,能够读取到 timestamp, watermark, register timers. 例如:ProcessFunction, KeyedProcessFunction, CoProcessFunction,ProcessJoinFunction, BroadcastProcessFunction,KeyedBroadcastProcessFunction, ProcessWindowFunction, and ProcessAllWindowFunction. 比如 KeyedProcessFunction 提供的接口,支持获取 TimerService,该类支持获取当前的 timestamp, watermark,同时注册基于时间的回调方法: processElement(v: IN, ctx: Context, out: Collector[OUT]) onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 由于使用了回调,注意线程和 cpu 的使用: 例子TempIncreaseAlterFunction:接收到的温度数据里,如果持续升高温度超过 1s,则在 timer 发出数据;如果期间温度降低,则取消 timer. 例子FreezingMonitor:使用了 OutputTag[X] 输出到多个 stream 例子ReadingFilter: 使用了 CoProcessFunction 来处理两个 stream 协同的场景 6.3 Window Operator window 的作用,即将 events 归到一个 bucket,然后基于 bucket 内有限的数据计算。 Tumbling Windows: ,TumblingEventTimeWindows.of TumblingProcessingTimeWindows.of, 默认对齐到 epoch,也可以指定 offset 参数。 Sliding Windows: , SlidingEventTimeWindows.of SlidingProcessingTimeWindows.of Session Window: , EventTimeSessionWindows.withGap ProcessingTimeSessionWindows.withGap 作用于 window 的 function 主要有三类: ReduceFunction: 比如计算窗口里的最大值、最小值等 AggregateFunction: 相比 1 更加灵活,不再限制数据类型,子类需要 override 创建初始值、累加、获取结果、merge 方法. 1 2对 state 使用都较小,因为记录的都是 aggregate 的值。 ProcessWindowFunction: 如果要计算一个窗口内的中间值,就依赖遍历 window 的数据了;所有数据在底层通过 ListState 存储,因此可能变的非常大,最好想办法变成 incrementally aggregated. 自定义 window 由三部分组成:assigner, trigger, evictor incremental aggregation function(记录 aggregation 值): full window function(记录全部 event,使用 ListState): ![]/assets/images/stream_processing_with_apache_flink/Pasted image 20240423095748.png)){:width=”300”} mix: 可以通过 extends WindowAssigner 实现自定义的窗口范围; extends Trigger 用于触发窗口数据的计算和结果返回,onElement onEventTime onProcessingTime返回TriggerResult,CONTINUE, FIRE, PURGE, FIRE_AND_PURGE等枚举值,可以参考内置的 class EventTimeTrigger extends Trigger<Object, TimeWindow>实现。 Evictor 是可选项,似乎仅在 Non Incremental Aggregation Function 里才有意义,没太看懂,具体可以翻翻TopSpeedWindowing的例子代码。 6.4 Joining Stream on Time Interval Join:INNER JOIN 的语义,且仅支持 Event Time. ){:width=”300”} 如图,表示 A 会选择 B 里 [-1hour, +15min] 时间范围,相同 key 的数据;如果 JOIN 不到,则忽略该数据。对应代码实现形如: input1 .keyBy(…) .between(<lower-bound>, <upper-bound>) // bounds with respect to input1 .process(ProcessJoinFunction) // process pairs of matched events B 也是对称的行为,即 JOIN A 对应时间范围内的数据。 按照上述行为,State 里就需要存储: A 里 >= CurrentWatermark - 15Min 的数据(B 可能会 JOIN) B 里 >= CurrentWatermark - 1Hour 的数据(A 可能会 JOIN) 如果两者的 watermark 对不齐,那则取决于更慢的那条流。注:此时 State 可能会遇到读写、大小的瓶颈 Window Join: Tumbling Window Join 的效果: 6.5 Handling Late Data 处理迟到的数据有三种方式:Drop, Redirecting, Updating Results By Including Late Events. Redirecting 主要依赖 Side-Output feature: Window Operator With Side-Output: .timeWindow.sideOutputLateData 在 ProcessFunction 里比较: class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] { val lateReadingsOut = new OutputTag[SensorReading]("late-readings") override def processElement( r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // compare record timestamp with current watermark if (r.timestamp < ctx.timerService().currentWatermark()) { // this is a late reading => redirect it to the side output ctx.output(lateReadingsOut, r) } else { out.collect(r) } } } allowedLateness允许迟到的数据再次参与 window 的计算(潜在行为即 window 的数据保留时间更长)

本文介绍了Apache Flink的有状态流处理的基本概念和架构,包括流处理的基本操作、时间语义、状态管理、检查点和恢复、开发环境搭建、DataStream API的使用、基于时间的操作和窗口操作、流的连接和迟到数据处理等内容。

Reading《Stream Processing with Apache Flink》-1st
相关推荐 去reddit讨论

GaussDB(DWS)基于Flink的实时数仓构建

原文约2300字,阅读约需6分钟。发表于:

深度解析GaussDB(DWS)+Flink如何增强湖仓增量数据在不同数据模型层之间的实时流动能力,如何为消息数据流提供高性能通用入库能力,又如何构建极致的端到端实时数仓解决方案。

华为云数仓GaussDB(DWS)利用Flink实现实时数仓构建,提供快速分析查询能力。增量计算解决高性能和数据入库问题。GaussDB(DWS)与Flink结合构建下一代Stream Warehouse,实现实时入出仓、实时增量加工和实时查询。GaussDB(DWS)结合Flink的能力包括Catalog、Source、Sink和流维。生态工具streamer简化数据入库操作。

相关推荐 去reddit讨论

AWS Graviton3 加速 Flink 作业执行:Benchmark

原文约6300字,阅读约需15分钟。发表于:

本文将比较 5 种相同大小的 EC2 实例在 Flink 集群执行 Nexmark Benchmark 时的性能和成本,并提供 Benchmark 环境搭建和执行的操作步骤。

AWS推出Graviton3,使用ARM Neoverse内核定制设计的ARM架构,提高计算性能25%。在中国区域推出C7g、M7g和R7g实例。比较了5种相同配置的EC2实例在执行Nexmark Benchmark时的性能和成本。

AWS Graviton3 加速 Flink 作业执行:Benchmark
相关推荐 去reddit讨论

Flink Keyed State的优化与实践

原文约4600字,阅读约需11分钟。发表于:

本文的内容主要是从业务场景跟进到RocksDB的读写行为,来优化RT耗时高的问题,并使用优化方案缓解compaction的压力。

Flink SQL在双流join场景中,当State的存储达到TB级别后,会发现State的scan/next/readNull请求RT会变得较高。通过使用RocksDB的BlobDB方案,可以大大降低IO和CPU毛刺。经过适配并在大State中开启KV分离后,观察RocksDB日志发现SST的文件大小急剧下降,State Key也全聚集在了L0和L1这两层中。最后的效果是ReadNull耗时全降到了百微妙左右,scan和next的RT 99线也降到了1毫秒左右。

相关推荐 去reddit讨论

Flink 流批一体在模型特征场景的使用

原文约5600字,阅读约需14分钟。发表于:

本文整理自B站资深开发工程师张杨老师在 Flink Forward Asia 2023 中 AI 特征工程专场的分享。

本文总结了B站资深开发工程师张杨在Flink Forward Asia 2023中的分享,包括模型特征场景、流批一体、性能优化和未来展望。

相关推荐 去reddit讨论

使用 SPL 高效实现 Flink SLS Connector 下推

原文约800字,阅读约需3分钟。发表于:

SLS 推出了 SPL 语言,可以高效的对日志数据的清洗,加工。对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。

阿里云的日志服务(SLS)和Flink集成使用,通过SPL语言实现数据清洗和加工,减少网络和计算开销。SPL语言支持行过滤、列裁剪等操作。在Flink中使用SLS SPL可以实现行过滤和列裁剪的功能。

相关推荐 去reddit讨论

hive 、spark 、flink之想一想

原文约900字,阅读约需2分钟。发表于:

19:flink反压机制,你是如何理解的?17:flink的cp ,sp说一说原理,有什么区别?你们的上下游的环境都是什么?9:spark中的app,job,stage,task是什么?25:flink消费kafka的offset是怎么维护的?10:spark的RDD是什么?18:flink的四个图是什么?20:flink的barrier对齐和非对齐是怎么理解的?13:spark 与mapreduce的区别是什么?14: spark的反压原理是什么?21:flink的精准一次和至少一次是怎么理解的?

本文介绍了Hive、Spark和Flink的产生、框架、执行流程、SQL语句执行、调优等方面的知识。

相关推荐 去reddit讨论

实时数据处理:Kafka 和 Flink

原文约3500字,阅读约需9分钟。发表于:

在大数据时代,实时洞察是保持领先的关键。但是如何利用不断流动的数据流的力量呢?Apache Kafka 和 Apache Flink登场,这对实时数据处理带来革命性变革的梦之队。这对充满活力的二人组协同工作,使您能够释放数据的真正潜力,从而实现即时洞察和明智的决策。更深入地了解 Kafka 和 Flink 如何联手创建实时数据引擎。为什么实时分析很重要在以数据驱动为特征的当代商业环境中,出现了一种关键能力:利用从实时数据中收集的见解的能力。这种对生成数据的理解和响应的熟练程度不再被认为是次要的好处,而是一种基本的必要性。正是在这种背景下,引入了实时数据处理,为组织提供了多种优势。首先,实时数据

Kafka和Flink是实时数据处理的重要工具,可以帮助企业实现即时洞察和明智决策。它们共同组成一个协同二人组,提供高效的实时数据处理。通过协同工作,Kafka和Flink可以实现欺诈检测、顾客行为分析、股市分析和物联网数据处理等应用。

相关推荐 去reddit讨论

如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展

原文约10700字,阅读约需26分钟。发表于:

本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。

本文介绍了Flink的Connecter/Catalog API的设计和使用方法,包括Source API和Sink API,以及集成到Table/SQL API和Catalog API中。同时提到了Catalog API的作用和血缘信息管理的功能。

相关推荐 去reddit讨论