标签

 apache 

相关的文章:

这是一个包含 Apache 技术文章的列表页,涵盖了 Apache Camel、Apache Doris、Apache Paimon、Apache SeaTunnel、Apache Kafka、Apache Airflow、Apache Spark Structured Streaming 等多个技术领域的文章。

Apache RocketMQ ACL 2.0 全新升级

原文约13100字,阅读约需32分钟。发表于:

RocketMQ ACL 2.0 不管是在模型设计、可扩展性方面,还是安全性和性能方面都进行了全新的升级。旨在能够为用户提供精细化的访问控制,同时,简化权限的配置流程。欢迎大家尝试体验新版本,并应用在生产环境中。

RocketMQ ACL 2.0是流行的分布式消息中间件RocketMQ的升级版本,解决了现有ACL 1.0版本面临的安全挑战,并引入了细粒度的API资源权限定义、多种匹配模式的授权资源、支持集群组件访问控制、用户身份验证和权限验证的分离、安全性和性能之间的平衡以及灵活可扩展的插件机制等新功能。文章还提供了访问控制模型、身份验证和授权过程、审计日志、部署架构、集群配置、用户和ACL管理以及扩展和迁移策略的信息。

相关推荐 去reddit讨论

Reading《Stream Processing with Apache Flink》-2nd

原文英文,约1700词,阅读约需7分钟。发表于:

1 Chapter7: Stateful Operators And Applications 1.1 Implementing Stateful Functions Keyed State: ValueState[T]: single value. The value can be read using ValueState.value() and updated with ValueState.update(value: T) ListState[T]: list of elements. 常用接口有add addAll get update MapState[K, V]: map of keys and values. get put contains remove ReducingState[T]: 类似 ListState, 但是不存储全部 list,而是 immediately aggregates value using a ReduceFunction AggregatingState[I, O]: 类似 reduce 和 aggregate 的关系,更加通用化 val sensorData: DataStream[SensorReading] = ??? // partition and key the stream on the sensor ID val keyedData: KeyedStream[SensorReading, String] = sensorData .keyBy(_.id) // apply a stateful FlatMapFunction on the keyed stream which // compares the temperature readings and raises alerts val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { // the state handle object private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { // create state descriptor val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) // obtain the state handle lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = { // fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { // temperature changed by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } // update lastTemp state this.lastTempState.update(reading.temperature) } } State 存储时使用 Flink 的TypeInformation(序列化、反序列化) StateDescriptor 是函数从 StateBackend 获取/注册 State 的描述符 Operator List State: 可以继承 public interface ListCheckpointed<T extends Serializable> { List<T> snapshotState(long checkpointId, long timestamp) throws Exception; void restoreState(List<T> state) throws Exception; } 注意:该接口已经标记 @Deprecated, 建议使用 CheckpointedFunction Broadcast State: 典型的场景是:a stream of rules and a stream of events on which the rules are applied, 即 事件流 和 规则流。 val sensorData: DataStream[SensorReading] = ??? val thresholds: DataStream[ThresholdUpdate] = ??? val keyedSensorData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) // the descriptor of the broadcast state val broadcastStateDescriptor = new MapStateDescriptor[String, Double]( "thresholds", classOf[String], classOf[Double]) val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds .broadcast(broadcastStateDescriptor) // connect keyed sensor stream and broadcasted rules stream val alerts: DataStream[(String, Double, Double)] = keyedSensorData .connect(broadcastThresholds) .process(new UpdatableTemperatureAlertFunction()) 注意 Broadcast events 可能乱序。 CheckpointedFunction, CheckpointListener跟 checkpoint 紧密相关,前者在触发 checkpoint 时调用,可以定义各类 State,例如ValueState ListState等,后者则注册了 checkpoint 完成时的回调。 1.2 Enabling Failure Recovery for Stateful Applications 1.3 Ensuring the Maintainability of Stateful Applications 任务会经常变动:Bugs need to be fixed, functionality adjusted, added, or removed, or the parallelism of the operator needs to be adjusted to account for higher or lower data rates. 为了确保任务的可维护性,关于 state 有两点需要注意: Specifying Unique Operator Identifiers : 最好从程序开始就为每个 operator 指定 Defining the Maximum Parallelism of Keyed State Operators: setMaxParallelism在这里更确切的作用是setCountOfKeyGroups 1.4 Performance and Robustness of Stateful Applications StateBackend: MemoryStateBackend, the FsStateBackend, and the RocksDBStateBackend. 使用 RocksDBStateBackend 时,不同 State 类型性能差别较大。比如 MapState[X, Y]比ValueState[HashMap[X, Y]]性能更高,ListState[X]比ValueState[List[X]]更适合频繁追加数据的场景。 滥用 state 会导致 state 过大的问题,比如 KeyedStream.aggregate 而 key 无限制,典型的比如统计用户行为时的 sessionId. 使用 timer 清理 state,确保 state 不会引发问题。例如: class SelfCleaningTemperatureAlertFunction(val threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { // the keyed state handle for the last temperature private var lastTempState: ValueState[Double] = _ // the keyed state handle for the last registered timer private var lastTimerState: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { // register state for last temperature val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) // register state for last timer val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long]) lastTimerState = getRuntimeContext.getState(timestampDescriptor) } override def processElement( reading: SensorReading, ctx: KeyedProcessFunction [String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { // compute timestamp of new clean up timer as record timestamp + one hour val newTimer = ctx.timestamp() + (3600 * 1000) // get timestamp of current timer val curTimer = lastTimerState.value() // delete previous timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimer) ctx.timerService().registerEventTimeTimer(newTimer) // update timer timestamp state lastTimerState.update(newTimer) // fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { // temperature increased by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } // update lastTemp state this.lastTempState.update(reading.temperature) } override def onTimer( timestamp: Long, ctx: KeyedProcessFunction [String, SensorReading, (String, Double, Double)]#OnTimerContext, out: Collector[(String, Double, Double)]): Unit = { // clear all state for the key lastTempState.clear() lastTimerState.clear() } } 1.5 Evolving Stateful Applications Updating an Application without Modifying Existing State: compatible Changing the Input Data Type of Built-in Stateful Operators: not compatible Removing State from an Application: 默认 avoid losing state,可以关闭 Modifying the State of an Operator: 比如ValueState[String]修改为ValueState[Double],兼容不全,尽量避免。 1.6 Queryable State 支持 state 的点查和读取,依赖 flink-queryable-state-client-java. 注:之前调研 flink 时,这个功能看上去非常强大,不过目前在官网已经看不到相关文档了。 2 Chapter8: Reading From and Write to External Systems 2.1 Application Consistency Guarantees 如果想不丢数据,source 需要是 resettable 的,例如读文件时 File ByteStream 的 offset,读 kafka 时 TopicPartition 的 offset. 但是如果想要 end-to-end exactly-once, sink connectors 还需要支持 idempotent writes or transactional writes. 后者比如 write-ahead-log (WAL) sink , two-phase-commit (2PC) sink.   Nonresettable source Resettable source Any Sink At-most-once At-least-once Idempotent sink At-most-once Exactly-once*(temporary inconsistencies during recovery) WAL sink At-most-once At-least-once 2PC sink At-most-once Exactly-once 注意 WAL sink 即使仅在 checkpoint complete 完成的时候 sink,也无法作答 Exactly-once.s 2.2 Provided Connectors Kafka, Filesystem, etc. 官网比书里已经更详细了。 2.3 Implementing a Custom Source Function SourceFunction and RichSourceFunction can be used to define nonparallel source connectors—sources that run with a single task. ParallelSourceFunction and RichParallelSourceFunction can be used to define source connectors that run with multiple parallel task instances. 注:接口后来有变化 当 checkpoint 进行的时候,需要记录此时的 offset, 就需要避免SourceFunction.run()emit data. 换句话说CheckpointedFunction.snapshotState和该方法,只能同时在执行一个。 需要注意 sourceFunction 某个 parallelism idle 时不会发出 watermark,可能导致整个任务在等待的情形。 2.4 Implementing a Custom Sink Function Idempotent Sink Connectors: 要求结果数据有 deterministic (composite) key,存储支持 Transactional Sink Connectors: GenericWriteAheadSink: 先写 WAL,收到 CheckpointCompleted 时写入到存储。听上去似乎很完美,但是实际上只能做到 At-least-once,有两种情况:存储的批量写入不是原子的;存储写入成功,但是 commit checkpoint 时失败。 TwoPhaseCommitSinkFunction sink operator 收到 checkpoint barrier:persists its state, prepares the current transaction for committing, and acknowledges the checkpoint at the JobManager. JobManager 收到所有 task instances 的 successful checkpoint notifications sink operator 收到 checkpoint completed 消息:commits all open transactions of previous checkpoints. 我理解 commit 确保了持久化, 如果 commit 失败的话,preCommit 的操作会被回滚,确保不会对 storage system 产生影响,因而保证了 Exactly-once 语义。书里有一个TransactionalFileSink的例子,很直观。当然支持该语义带来的问题也需要注意,一是 checkpoint 完成后数据才可见;二是对 kafka transaction timeout 调优,避免一直 commit 失败导致可能的数据丢失。 2.5 Asynchronously Accessing External Systems 异步查询词典的场景 3 Chapter9: Setting Up Flink for Streaming Applications 3.1 Deployment Modes Standalone Cluster: 启动: 提交: Docker YARN: JobMode: SessionMode: 启动: 提交: 注:ApplicationMode Kubernetes: 生产环境的目标状态应当还是容器化部署 3.2 Highly Available Setups HA 的目的是 as little downtime as possible. TaskManager 失败可以由 ResourceManager 恢复,JobManager 失败则依赖于 HA 部署。 HA 需要考虑的存储有:JAR file, the JobGraph, and pointers to completed checkpoints. 书里介绍了 ZooKeeper HA Services,当前还有 Kubernetes HA Services. 实践经验里看还是有些坑的,尤其是 Yarn 相关参数。 3.3 Integration with Hadoop Components 3.4 Filesystem Configuration 3.5 System Configuration Java and Classloading: Flink 提供了 User-code class loaders, 注意 classloader.resolve-order 相关的配置。 CPU: task 在 TaskManager 的线程运行,以 slot 的方式对外提供。 Main Memory and Network Buffers: JM TM 内存重点不同,额外注意 network buffer 和 rocksdb backend. Disk Storage Checkpointing and State Backends Security 4 Chapter10: Operating Flink and Streaming Applications 4.1 Running and Managing Streaming Applications 这节提到了“maintaining streaming applications is more challenging than maintaining batch applications”,我个人觉得对于 streaming applications,maintaining 比 develop 更具挑战性。maintaining = start, stop, pause and resume, scale, and upgrade. 操作 flink 任务可以使用 CLI 或者 REST API. savepoint 相关功能最好通过uid()定义 Unique Operator IDs. Kubernetes 相关的内容已经过时,建议直接参考文档。 4.2 Controlling Task Scheduling Task Chaining 可以将网络通信转为线程内方法的直接调用,因此 Flink 默认开启,如有必要可以通过disableChaining, startNewChain调优。 Slot-Sharing Groups 允许用户自己协调计算密集和 IO 密集的 task: // slot-sharing group "green" val a: DataStream[A] = env.createInput(...) .slotSharingGroup("green") .setParallelism(4) val b: DataStream[B] = a.map(...) // slot-sharing group "green" is inherited from a .setParallelism(4) // slot-sharing group "yellow" val c: DataStream[C] = env.createInput(...) .slotSharingGroup("yellow") .setParallelism(2) // slot-sharing group "blue" val d: DataStream[D] = b.connect(c.broadcast(...)).process(...) .slotSharingGroup("blue") .setParallelism(4) val e = d.addSink() // slot-sharing group "blue" is inherited from d .setParallelism(2) 如上代码,不同 task 分配的效果: 4.3 Tuning Checkpointing and Recovery 重要配置: 间隔、minPause、超时时间、 程序退出时是否删除 选择合适的 backend Restart strategies 4.4 Monitoring Flink Clusters and Applications Flink WebUI 可以用来初步分析任务:日志、metrics 等。如果要深入分析,则依赖 metrics systems. 4.5 Configuring the Logging Behavior 5 Chapter11: Where to Go from Here? 批处理、TableAPI、SQL、CEP 等

本文介绍了Flink中的有状态操作符和应用程序的实现方法。其中包括键控状态、映射状态、聚合状态等不同类型的状态。文章还讨论了如何实现故障恢复、确保应用程序的可维护性、提高性能和稳定性等方面的内容。此外,还介绍了外部系统的读写和Flink的部署、运行和监控等方面的知识。最后,提到了可查询状态和自定义源和接收器的实现方法。

Reading《Stream Processing with Apache Flink》-2nd
相关推荐 去reddit讨论

Reading《Stream Processing with Apache Flink》-1st

原文约10800字,阅读约需26分钟。发表于:

1 Chapter1: Introduction to Stateful Stream Processing 2 Chapter2: Stream Processing Fundamentals 介绍了 Parallel、Time、State 等概念 Processing Streams in Parallel Latency and Throughput: 延迟、吞吐的关系 Operations on DataStreams: 输入输出、算子、聚合、窗口 Time Semantics: processing time 适合对数据延迟、乱序不敏感的场景;event time 适合对结果要求准确且唯一的场景,引入了 watermark 避免一直等待。 State And Consitency Models: 批的 failover 可以依赖回放数据,但是流不可以;真实世界使用最多的是 At-least-once,如何保证?一个方案是确保保存数据,直到所有 task 都返回了 ACK 3 Chapter3: The Architecture of Apache Flink 3.1 System Architecture Components of a Flink Setup: JobManager(生成和分配ExecutionGraph、任务协调);ResourceManager(跟 resource provider 交互,申请和回收 taskmanager 资源); TaskManager(实际的 worker process);Dispatcher(Rest). 根据环境不同,有的 components 可能跑在一个 JVM Process 上。注意跟现在的已经不一样了 Task Execution: taskmanager 多个 slot,上下游 operator 的 parallelism 不同时,就会发生数据的 exchange. High Available Setup: TaskManager failures: JobManager 跟 ResourceManager 申请新的 slot JobManager failures: 数据持久化到 storage,pointer 存储到 zk,新的 JM 通过 zk 上的 latest complete checkpoint 恢复任务 3.2 Data Transfer In Flink Task Chaining: 用户定义 : chain 为函数间的调用关系 : 有时也会希望在多个线程间执行 : t1=0.1s t2=0.8s t3=0.2s,1 个线程 1qps,因此 10 个线程 10qps;也可以 f1 1 个线程,f2 8个,f3 1个(不过我没想清楚区别在哪) 3.3 Event-Time Processing 相比 Processing-Time 的流系统,实现上更加复杂 Timestamps: 事件的元数据 Watermarks: 如果收到了 T 时间戳的 watermark,则表示 T 之前的数据都已经到达。后续如果有违反该约定的数据,成为 Late Record.在 Handling Late Data 一节分析。合适的 watermark 是在 latency 和 completeness tradeoff. Watermark Propagation and Event Time: 多流的场景,不同流的 watermark 有快有慢,更加复杂 Timestamp Assignment and Watermark Genearation: timestamp、watermark 显示设置,有三种方式: Source Function、AssignerWithPeriodicWatermarks、AssignerWithPunctuatedWatermarks,从 record 提取 timestamp,同时结合配置计算当前的 watermark. 后两者有对应的子类实现。 3.4 State Management: Operator State:,有 ListState、UnionListState、BroadcastState Keyed State:,有 ValueState、ListState、MapState State Backend:state 的读写速度影响 latency Scaling Stateful Operators:,scale out、scale in 都按照 key group,而不是 redistribute. Operator list state : Operator union list state: Operator broadcast state: 3.5 Checkpoints, Savepoints, and State Recovery Consistent Checkpoints: naive mechanism 需要暂停数据输入,待所有 in-flight 的数据都处理完成后再 resume,但是 flink 采用了更 sophisticated 的方法: ,Source 产生 1,2,3,… 的数据,在图中的时刻,checkpoint 记录了 Source offset = 5, 而奇数和偶数的 sum 分别为 9 和 6. Recovery From a Consistent Checkpoint:, task 失败从 checkpoint 恢复时,从 5 之后继续消费,数据是正确且一致的。注意 sink operators 可能收到多条。 Flink’s Checkpointing Algorithm : Flink 没有使用 pause-checkpoint-resume 的做法,而是基于 Chandy-Lamport algorithm for distributed snapshots. 例如这个过程: source 分为两部分,每部分都生成递增的数字,当前状态如图所示: 此时 JobManager 触发 checkpointID=2(三角形): Source 收到后,记录此时 source 的 offset(3, 4),并在当前位置插入 checkpoint barrier(ID=2),跟普通数据一样,发送到下游算子: 下游算子收到后,等待所有上游算子实例的 ID=2 的 barrier:,此时上游算子仍然在产生数据,当前算子也缓存着晚于 barrier 的数据(例如 Source1 产生的蓝色圆圈4) 当所有 ID=2 的 barrier 到达后,该算子也写入 checkpoint 数据(8, 8), 待当前算子发送所有 ID=2 的 barrier 后,处理缓存的数据并发送: 当 sink operators 也 ACK checkpoint 后,就认为 ID=2 的 checkpoint 全部完成 Performance Implications Of Checkpointing: 异步的将 local snapshot to the remote storage;不强制等待 barrier 对齐,而是继续处理并发送数据到下游(代价是恢复时只能 exactly-once,以及随着非对齐增多导致 state 变大?) Savepoints: checkpoints 主要用于失败恢复的场景,但是 consistent snapshots 实际上有更多的用途。Using savepoints:比如 fix bugs and reprocesss 的场景,或者 A/B tests,不过需要 application 前后兼容。修改并发、修改集群、pause-resume. Starting an application from a savepoint : 4 Chapter4: Setting Up a Development Environment for Apache Flink 主要介绍在 IDE 上运行 Flink 任务,注意有些 issue 例如 ClassLoader 跟实际环境是不同的 5 Chapter5: The DataStream API(v1.7) Hello, Flink: 构建一个 flink application 有 5 步: Set Up the Execution Environment Read An Input Stream Apply Transformations Output the result Execute Transformations: Basic Transformations: on individual events, Map/Filter/FlatMap KeyedStream Transformations: in context of a key keyBy: convert DataStream into KeyedStream Rolling aggregations: sum/min/max/minBy/maxBy Reduce MultiStream Transformations: merge into one or split into multiple Union: Connect, coMap, and coFlatMap: DataSteam 的数据是随机处理的,因此 ConnectedStream 常用于两个 KeyedStream、DataStream + Broadcast 以确保结果的确定性,因此用到了 keyedState. Split and select: split 与 union 相反 : ,返回 SplitStream, 通过 select 方法返回不同的 DataStream Distribution Transformations: 普通情况下是由 operation semantics and parallelism 决定的,不过也支持 shuffle/rebalance/rescale(rebalance vs rescale: /broadcast/global/partitionCustom(自定义) Setting the Parallelism: application 和 opertor 级别 Types: 网络传输、读写 statebackend 都会用到 Types; 统一各语言的 type diff,例如 scala 和 java 的 tuple (seg-1)、语言特有的,比如 scala case class Supported Data Types 包括 Primitives、Java and Scala tuples、Scala case classes、POJOS, including classes generated by Apache Arvo、Some special types, 其他类型则 fallback 到 Kryo serialization framework. Creating Type Information for Data Types: type system 的核心类是 TypeInformation,flink 为支持的各种数据类型,都提供了对应的子类实现,例如 NumericTypeInfo 封装了 Integer Long Double Byte Short Float Character 类。 Explicitly Providing Type Information: 自动提取 TypeInformation 失败的场景(例如 java 里的 erasing generic type information),此时就需要显示指定 return 的 TypeInformation 了。 Defining Keys and Referencing Fields : 可以按照 pos、字段名(literal)、KeySelector Implementing Functions : Function 应当是 Serializable,如果存在 non-serializable field,需要 override RichFunction.open 方法 6 Chapter6: Time-Based And Window Operators 6.1 Configuring Time Characteristics 6.1.1 Assigning Timestamp and Generating Watermarks DataStream.assignTimestampsAndWatermarks 的参数类型可以为 AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks,两者都继承自 TimestampAssigner. 其中: TimestampAssigner.extractTimestamp: 定义了提取 timestamp 的接口 Watermark checkAndGetNextWatermark/ Watermark getCurrentWatermark() :定义了提取 watermark 的接口 AssignerWithPeriodicWatermarks: Watermark getCurrentWatermark() ,watermark 跟 timestamp 有关的场景 # timestamp 为 SensorReading.timestamp # watermark 为当前收到的maxTimestamp - 1min # env.getConfig.setAutoWatermarkInterval(5000) => 每 5s 调用一次 getCurrentWatermark 方法 class PeriodicAssigner     extends AssignerWithPeriodicWatermarks[SensorReading] {   val bound: Long = 60 * 1000     // 1 min in ms   var maxTs: Long = Long.MinValue // the maximum observed timestamp   override def getCurrentWatermark: Watermark = {     // generated watermark with 1 min tolerance     new Watermark(maxTs - bound)   }   override def extractTimestamp(       r: SensorReading,       previousTS: Long): Long = {     // update maximum timestamp     maxTs = maxTs.max(r.timestamp)     // return record timestamp     r.timestamp   } } 明确事件时间递增的前提下,简化为 assignAscendingTimeStamps,相当于使用了内置的 AscendingTimestampExactor implements AssignerWithPeriodicWatermarks ;另外一种常用的内置 AssignerWithPeriodicWatermarks 则是 BoundedOutOfOrdernessTimeStampExtractor. AssignerWithPunctuatedWatermarks: Watermark checkAndGetNextWatermark,watermark 跟 event 自身有关的场景 # sensor_1 携带着 watermark class PunctuatedAssigner     extends AssignerWithPunctuatedWatermarks[SensorReading] {   val bound: Long = 60 * 1000 // 1 min in ms   override def checkAndGetNextWatermark(       r: SensorReading,       extractedTS: Long): Watermark = {     if (r.id == "sensor_1") {       // emit watermark if reading is from sensor_1       new Watermark(extractedTS - bound)     } else {       // do not emit a watermark       null     }   }   override def extractTimestamp(       r: SensorReading,       previousTS: Long): Long = {     // assign record timestamp     r.timestamp   } } 6.1.2 Watermarks, Latency and Completeness Watermarks are used to balance latency and result completenes, 6.2 Process Functions 相比之前介绍的 MapFunction,process functions 是一组 low-level transformation,能够读取到 timestamp, watermark, register timers. 例如:ProcessFunction, KeyedProcessFunction, CoProcessFunction,ProcessJoinFunction, BroadcastProcessFunction,KeyedBroadcastProcessFunction, ProcessWindowFunction, and ProcessAllWindowFunction. 比如 KeyedProcessFunction 提供的接口,支持获取 TimerService,该类支持获取当前的 timestamp, watermark,同时注册基于时间的回调方法: processElement(v: IN, ctx: Context, out: Collector[OUT]) onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 由于使用了回调,注意线程和 cpu 的使用: 例子TempIncreaseAlterFunction:接收到的温度数据里,如果持续升高温度超过 1s,则在 timer 发出数据;如果期间温度降低,则取消 timer. 例子FreezingMonitor:使用了 OutputTag[X] 输出到多个 stream 例子ReadingFilter: 使用了 CoProcessFunction 来处理两个 stream 协同的场景 6.3 Window Operator window 的作用,即将 events 归到一个 bucket,然后基于 bucket 内有限的数据计算。 Tumbling Windows: ,TumblingEventTimeWindows.of TumblingProcessingTimeWindows.of, 默认对齐到 epoch,也可以指定 offset 参数。 Sliding Windows: , SlidingEventTimeWindows.of SlidingProcessingTimeWindows.of Session Window: , EventTimeSessionWindows.withGap ProcessingTimeSessionWindows.withGap 作用于 window 的 function 主要有三类: ReduceFunction: 比如计算窗口里的最大值、最小值等 AggregateFunction: 相比 1 更加灵活,不再限制数据类型,子类需要 override 创建初始值、累加、获取结果、merge 方法. 1 2对 state 使用都较小,因为记录的都是 aggregate 的值。 ProcessWindowFunction: 如果要计算一个窗口内的中间值,就依赖遍历 window 的数据了;所有数据在底层通过 ListState 存储,因此可能变的非常大,最好想办法变成 incrementally aggregated. 自定义 window 由三部分组成:assigner, trigger, evictor incremental aggregation function(记录 aggregation 值): full window function(记录全部 event,使用 ListState): ![]/assets/images/stream_processing_with_apache_flink/Pasted image 20240423095748.png)){:width=”300”} mix: 可以通过 extends WindowAssigner 实现自定义的窗口范围; extends Trigger 用于触发窗口数据的计算和结果返回,onElement onEventTime onProcessingTime返回TriggerResult,CONTINUE, FIRE, PURGE, FIRE_AND_PURGE等枚举值,可以参考内置的 class EventTimeTrigger extends Trigger<Object, TimeWindow>实现。 Evictor 是可选项,似乎仅在 Non Incremental Aggregation Function 里才有意义,没太看懂,具体可以翻翻TopSpeedWindowing的例子代码。 6.4 Joining Stream on Time Interval Join:INNER JOIN 的语义,且仅支持 Event Time. ){:width=”300”} 如图,表示 A 会选择 B 里 [-1hour, +15min] 时间范围,相同 key 的数据;如果 JOIN 不到,则忽略该数据。对应代码实现形如: input1 .keyBy(…) .between(<lower-bound>, <upper-bound>) // bounds with respect to input1 .process(ProcessJoinFunction) // process pairs of matched events B 也是对称的行为,即 JOIN A 对应时间范围内的数据。 按照上述行为,State 里就需要存储: A 里 >= CurrentWatermark - 15Min 的数据(B 可能会 JOIN) B 里 >= CurrentWatermark - 1Hour 的数据(A 可能会 JOIN) 如果两者的 watermark 对不齐,那则取决于更慢的那条流。注:此时 State 可能会遇到读写、大小的瓶颈 Window Join: Tumbling Window Join 的效果: 6.5 Handling Late Data 处理迟到的数据有三种方式:Drop, Redirecting, Updating Results By Including Late Events. Redirecting 主要依赖 Side-Output feature: Window Operator With Side-Output: .timeWindow.sideOutputLateData 在 ProcessFunction 里比较: class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] { val lateReadingsOut = new OutputTag[SensorReading]("late-readings") override def processElement( r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // compare record timestamp with current watermark if (r.timestamp < ctx.timerService().currentWatermark()) { // this is a late reading => redirect it to the side output ctx.output(lateReadingsOut, r) } else { out.collect(r) } } } allowedLateness允许迟到的数据再次参与 window 的计算(潜在行为即 window 的数据保留时间更长)

本文介绍了Apache Flink的有状态流处理的基本概念和架构,包括流处理的基本操作、时间语义、状态管理、检查点和恢复、开发环境搭建、DataStream API的使用、基于时间的操作和窗口操作、流的连接和迟到数据处理等内容。

Reading《Stream Processing with Apache Flink》-1st
相关推荐 去reddit讨论

Unity Catalog Lakeguard: Industry-first and only data governance for multi-user Apache Spark™ clusters

原文英文,约1100词,阅读约需4分钟。发表于:

We are thrilled to announce Unity Catalog Lakeguard , which allows you to run Apache Spark™ workloads in SQL, Python, and Scala with...

Unity Catalog Lakeguard允许用户在Databricks Data Intelligence平台上以SQL、Python和Scala运行Apache Spark工作负载,并提供完整的数据治理。它通过在共享计算中从其他用户代码和Spark引擎中隔离运行用户代码来强制执行数据治理。这样可以安全共享集群,降低计算成本和操作负担。Lakeguard还用于隔离Databricks SQL仓库中的Python UDF。使用Lakeguard,Databricks客户可以以SQL、Python和Scala运行工作负载,并实现完整的数据治理。

Unity Catalog Lakeguard: Industry-first and only data governance for multi-user Apache Spark™ clusters
相关推荐 去reddit讨论

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(三)EMR Serverless 操作要点、优化以及开放集成测试

原文约7700字,阅读约需19分钟。发表于:

在数据驱动的世界中,企业正在寻求可靠且高性能的解决方案来管理其不断增长的数据需求。本系列博客从一个重视数据安全和合规性的 B2C 金融科技客户的角度来讨论云上云下混合部署的情况下如何利用亚马逊云科技云原生服务、开源社区产品以及第三方工具构建无服务器数据仓库的解耦方法。

本文介绍了如何利用亚马逊云科技的EMR Serverless服务构建无服务器数据仓库,包括操作要点、优化和开放集成测试。还提到了优化EMR Serverless性能的方法和EMR Studio的功能。总结了无服务器数据仓库平台的优势。

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(三)EMR Serverless 操作要点、优化以及开放集成测试
相关推荐 去reddit讨论

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(二)Apache DolphinScheduler 集成以及 LOB 粒度资源消费分析

原文约5700字,阅读约需14分钟。发表于:

在数据驱动的世界中,企业正在寻求可靠且高性能的解决方案来管理其不断增长的数据需求。本系列博客从一个重视数据安全和合规性的 B2C 金融科技客户的角度来讨论云上云下混合部署的情况下如何利用亚马逊云科技云原生服务、开源社区产品以及第三方工具构建无服务器数据仓库的解耦方法。

本文介绍了利用亚马逊云科技的云原生服务和开源社区产品构建无服务器数据仓库的解耦方法,集成Apache DolphinScheduler和EMR Serverless实现高效可靠的数据编排和处理,切换DolphinScheduler的存储层到S3并通过S3上传作业脚本,以及通过标签机制实现LOB粒度的资源消费分析。

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(二)Apache DolphinScheduler 集成以及 LOB 粒度资源消费分析
相关推荐 去reddit讨论

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(一)云上云下数据同步方案设计

原文约7700字,阅读约需19分钟。发表于:

在数据驱动的世界中,企业正在寻求可靠且高性能的解决方案来管理其不断增长的数据需求。本系列博客从一个重视数据安全和合规性的 B2C 金融科技客户的角度来讨论云上云下混合部署的情况下如何利用亚马逊云科技云原生服务、开源社区产品以及第三方工具构建无服务器数据仓库的解耦方法。

本文介绍了金融科技客户如何利用亚马逊云科技的无服务器数据仓库解耦方法来管理数据需求,包括使用Apache EMR Serverless、Apache DolphinScheduler和Amazon Athena等工具实现数据同步和处理。同时介绍了云上云下数据同步的架构设计和解决方案,以及数据完整性检查方法和亚马逊云科技DataSync的保证机制。亚马逊云科技提供丰富的产品和服务满足金融科技客户的数据安全和合规性需求。

利用 Amazon EMR Serverless、Amazon Athena、Apache Dolphinscheduler 以及本地 TiDB 和 HDFS 在混合部署环境中构建无服务器数据仓库(一)云上云下数据同步方案设计
相关推荐 去reddit讨论

Spring for Apache Pulsar 1.0.5 available now

原文英文,约100词,阅读约需1分钟。发表于:

On behalf of the team and everyone who has contributed, I’m happy to announce that Spring for Apache Pulsar 1.0.5 has been released and is now available from Maven Central. This release will be included in the upcoming Spring Boot 3.2.5 release. This release includes several dependency upgrades. Please see the release notes for more details.

Spring for Apache Pulsar 1.0.5已发布,可从Maven Central获取。此版本包括多个依赖升级。

Spring for Apache Pulsar 1.0.5 available now
相关推荐 去reddit讨论

Spring for Apache Kafka 3.0.16, 3.1.4 and 3.2.0-RC1 Available Now

原文英文,约300词,阅读约需2分钟。发表于:

On behalf of the entire team and everyone in the community who contributed, we are pleased to announce the general availability of Spring for Apache Kafka 3.0.16 and 3.1.4. Both of these GA releases include a few improvements and bug fixes. For more details, see the following change logs. https://github.com/spring-projects/spring-kafka/releases/tag/v3.0.16 https://github.com/spring-projects/spring-kafka/releases/tag/v3.1.4 Spring Boot 3.1.11 and 3.2.5 releases will include Spring for Apache Kafka 3.0.16 and 3.1.14, respectively. In addition, we are pleased to announce the first release candidate for Spring for Apache Kafka 3.2.0 (3.2.0-RC1). This release candidate includes new additions, feature enhancements, bug fixes, and documentation updates. Some notable improvements include: @PartitionOffset now supports TopicPartitionOffset.SeekPosition More observability-related enhancements Since most of the Apache Kafka-specific native hints have been migrated to the graalvam-reachability-metadata repository, removed those hints in Spring for Apache Kafka Optionally allow a Kafka Streams processor to leave the consumer group when closing the streams For more details on all the changes in the 3.2.0-RC1, see https://github.com/spring-projects/spring-kafka/releases/tag/v3.2.0-RC1. The upcoming Spring Boot 3.3.0-RC1 will include the 3.2.0-RC1 version of Spring for Apache Kafka. The next step on the release journey is the GA release for 3.2.0 on May 20th. During the next few weeks, we will address fixing any reported bugs, adding more testing, improving documentation, etc. We want to express our gratitude to everyone who contributed to these releases. Your contributions are invaluable and play a crucial role in the continuous improvement of Spring for Apache Kafka. Staying in touch with the project The following are the usual avenues to engage with the project. GitHub | Issues | Documentation | Stack Overflow

Spring for Apache Kafka 3.0.16, 3.1.4, and 3.2.0-RC1现已发布,包含改进、错误修复和新功能。3.2.0的GA版本计划于5月20日发布。感谢社区的贡献。

Spring for Apache Kafka 3.0.16, 3.1.4 and 3.2.0-RC1 Available Now
相关推荐 去reddit讨论

Spring for Apache Pulsar 1.1.0-RC1 available now

原文英文,约100词,阅读约需1分钟。发表于:

On behalf of the team and everyone who has contributed, I’m happy to announce that Spring for Apache Pulsar 1.1.0-RC1 has been released and is available from https://repo.spring.io/milestone now! This release will be included in the upcoming Spring Boot 3.3.0-RC1 release. This release includes numerous enhancements, documentation improvements, bug fixes, and dependency upgrades. Notable new features include: Transaction support for @PulsarListener and PulsarTemplate Please see the release notes for more details.

Spring for Apache Pulsar 1.1.0-RC1已发布,可从https://repo.spring.io/milestone下载。此版本包括事务支持和其他改进。

Spring for Apache Pulsar 1.1.0-RC1 available now
相关推荐 去reddit讨论