Flink编程模型与API
Flink 异步IO机制异步IO机制原理Flink的异步I/O是一个非常受欢迎的特性由阿里巴巴贡献给社区并在1.2版本中引入,它的主要目的是解决与外部系统交互时网络延迟成为系统瓶颈的问题外部系统往往是外部数据库。在Flink流计算系统中与外部数据库进行交互是常见的需求,通常情况下我们会发送一个查询请求到数据库并等待结果返回这期间无法发送其他请求这种同步访问方式会导致阻塞阻碍了吞吐量和延迟为了解决这个问题引入了异步模式能够并发地处理多个到外部数据库的请求下图为官方提供的异步IO原理图。在Flink中使用异步I/O我们可以连续发送多个查询请求到数据库并在回复返回时处理每个回复而不需要阻塞等待这种并发处理的方式极大地减少了延迟。异步I/O专门用于解决Flink计算过程中与外部系统的交互问题特别需要注意的是为了提高Flink与外部系统交互能力也可以提高Flink的并行度进而提高Flink处理数据的吞吐量这种方式会付出更高的资源成本如更多的task、更多的内存缓存、更高的网络连接,而异步IO方式相对这种方式是基于某一个task之上的扩展重复利用一个task资源做更多的事情提高了Flink性能和资源利用率。异步IO使用前提在Flink中查询外界数据库数据时要使用异步IO需要满足如下条件之一数据库或K/V存储系统提供支持异步请求的客户端例如Java的Vertx。对于不支持异步请求客户端的外部系统可以使用线程池模拟异步客户端。注意:Java Vertx是一个基于JVM的应用平台适用于移动端后台、互联网和企业应用架构,采用了基于Netty的全异步通信,支持多种语言目前Vertx的异步驱动已经支持了Postgres、MySQL、MongoDB、Redis等常用组件可以作为异步连接这些数据库的工具。异步IO代码数据准备异步请求客户端yi/** * 实现Flink异步IO方式一:使用 Vert.x 实现异步 IO * 案例读取MySQL中的数据 */ public class AsyncIOTest1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //为了测试效果这里设置并行度为1 env.setParallelism(1); //准备数据流 DataStreamSourceInteger idDS env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); /** * 使用异步IO参数解释如下 * 第一个参数是输入数据流 * 第二个参数是异步IO的实现类 * 第三个参数是用于完成异步操作超时时间 * 第四个参数是超时时间单位 * 第五个参数可以触发的最大异步i/o操作数 */ AsyncDataStream.unorderedWait(idDS, new AsyncDatabaseRequest1(), 5000, TimeUnit.MILLISECONDS, 10) .print(); env.execute(); } } class AsyncDatabaseRequest1 extends RichAsyncFunctionInteger, String { //定义JDBCClient共享对象 JDBCClient mysqlClient null; //初始化资源连接Mysql Override public void open(Configuration parameters) throws Exception { //创建连接mysql配置对象 JsonObject config new JsonObject() .put(url, jdbc:mysql://node2:3306/mydb?useSSLfalse) .put(driver_class, com.mysql.jdbc.Driver) .put(user, root) .put(password, 123456); //创建VertxOptions对象 VertxOptions vo new VertxOptions(); //设置Vertx要使用的事件循环线程数 vo.setEventLoopPoolSize(10); //设置Vertx要使用的最大工作线程数 vo.setWorkerPoolSize(20); //创建Vertx对象 Vertx vertx Vertx.vertx(vo); //创建JDBCClient共享对象多个Vertx 客户端可以共享一个JDBCClient对象 mysqlClient JDBCClient.createShared(vertx, config); } //实现异步IO的方法第一个参数是输入第二个参数是异步IO返回的结果 Override public void asyncInvoke(Integer input, ResultFutureString resultFuture) { mysqlClient.getConnection(new HandlerAsyncResultSQLConnection() { Override public void handle(AsyncResultSQLConnection sqlConnectionAsyncResult) { if (sqlConnectionAsyncResult.failed()) { System.out.println(获取连接失败 sqlConnectionAsyncResult.cause().getMessage()); return; } //获取连接 SQLConnection connection sqlConnectionAsyncResult.result(); //执行查询 connection.query(select id,name,age from async_tbl where id input, new HandlerAsyncResultio.vertx.ext.sql.ResultSet() { Override public void handle(AsyncResultio.vertx.ext.sql.ResultSet resultSetAsyncResult) { if (resultSetAsyncResult.failed()) { System.out.println(查询失败 resultSetAsyncResult.cause().getMessage()); return; } //获取查询结果 io.vertx.ext.sql.ResultSet resultSet resultSetAsyncResult.result(); //打印查询的结果 //将查询结果返回给Flink resultSet.getRows().forEach(row - { resultFuture.complete(Collections.singletonList(row.encode())); }); } }); } }); } /** * 异步IO超时处理逻辑主要避免程序出错。参数如下: * 第一个参数是输入数据 * 第二个参数是异步IO返回的结果 */ Override public void timeout(Integer input, ResultFutureString resultFuture) throws Exception { resultFuture.complete(Collections.singletonList(异步IO超时)); } //关闭资源 Override public void close() throws Exception { mysqlClient.close(); } }Scala代码实现:/** * 实现Flink异步IO方式一:使用 Vert.x 实现异步 IO * 案例读取MySQL中的数据 */ object AsyncIOTest1 { def main(args: Array[String]): Unit { val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment //为了测试效果这里设置并行度为1 env.setParallelism(1) //导入隐式转换 import org.apache.flink.streaming.api.scala._ //准备数据流 val idDS: DataStream[Int] env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AsyncDataStream.unorderedWait(idDS, new AsyncDatabaseRequest1(), 5000, java.util.concurrent.TimeUnit.MILLISECONDS, 10) .print() env.execute() } } class AsyncDatabaseRequest1 extends RichAsyncFunction[Int,String]() { //定义JDBCClient对象 var mysqlClient: JDBCClient null //初始化资源,连接MySQL override def open(parameters: Configuration): Unit { //创建连接MySQL的配置信息 val config: JsonObject new JsonObject() .put(url, jdbc:mysql://node2:3306/mydb?useSSLfalse) .put(driver_class, com.mysql.jdbc.Driver) .put(user, root) .put(password, 123456) //创建VertxOptions对象 val vo new VertxOptions() //设置Vertx要使用的事件循环线程数 vo.setEventLoopPoolSize(10) //设置Vertx要使用的最大工作线程数 vo.setWorkerPoolSize(20) //创建Vertx对象 val vertx io.vertx.core.Vertx.vertx(vo) //创建 JDBCClient 共享对象多个Vertx客户端可以共享一个JDBCClient实例 mysqlClient JDBCClient.createShared(vertx, config) } //实现异步IO的方法,第一个参数是输入第二个参数是异步IO返回的结果 override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit { //获取MySQL连接 mysqlClient.getConnection(new Handler[AsyncResult[SQLConnection]] { override def handle(sqlConnectionAsyncResult: AsyncResult[SQLConnection]): Unit { if(!sqlConnectionAsyncResult.failed()){ //获取连接 val connection : SQLConnection sqlConnectionAsyncResult.result() //执行查询 connection.query(select id,name,age from async_tbl where id input,new Handler[AsyncResult[ResultSet]] { override def handle(resultSetAsyncResult: AsyncResult[ResultSet]): Unit { if(!resultSetAsyncResult.failed()){ //获取查询结果 val resultSet: ResultSet resultSetAsyncResult.result() resultSet.getRows().asScala.foreach(row{ //返回结果 resultFuture.complete(List(row.encode())) }) } } }) } } }) } /** * 异步IO超时处理逻辑主要避免程序出错。参数如下: * param input 输入数据 * param resultFuture 异步IO返回的结果 */ override def timeout(input: Int, resultFuture: ResultFuture[String]): Unit { resultFuture.complete(List(异步IO超时)) } //关闭资源 override def close(): Unit { mysqlClient.close() //关闭连接 } }线程池模拟异步请求客户端/** * 实现Flink异步IO方式二:线程池模拟异步客户端 * 案例读取MySQL中的数据 */ public class AsyncIOTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //为了测试效果这里设置并行度为1 env.setParallelism(1); //准备数据流 DataStreamSourceInteger idDS env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); /** * 使用异步IO参数解释如下 * 第一个参数是输入数据流 * 第二个参数是异步IO的实现类 * 第三个参数是用于完成异步操作超时时间 * 第四个参数是超时时间单位 * 第五个参数可以触发的最大异步i/o操作数 */ AsyncDataStream.unorderedWait(idDS, new AsyncDatabaseRequest2(), 5000, TimeUnit.MILLISECONDS, 10) .print(); env.execute(); } } class AsyncDatabaseRequest2 extends RichAsyncFunctionInteger, String { //准备线程池对象 ExecutorService executorService null; //初始化资源这里主要是初始化线程池 Override public void open(Configuration parameters) throws Exception { //初始化线程池,第一个参数是线程池中线程的数量第二个参数是线程池中线程的最大数量第三个参数是线程池中线程空闲的时间第四个参数是线程池中线程空闲时间的单位第五个参数是线程池中的任务队列 executorService new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable()); } Override public void asyncInvoke(Integer input, ResultFutureString resultFuture) throws Exception { //提交异步任务到线程池中 executorService.submit(new Runnable() { Override public void run() { try { /** * 以下两个方法不能设置在open方法中因为多线程共用数据库连接和pst对象这样会导致线程不安全 */ Connection conn DriverManager.getConnection(jdbc:mysql://node2:3306/mydb?useSSLfalse, root, 123456); PreparedStatement pst conn.prepareStatement(select id,name,age from async_tbl where id ?); //设置参数 pst.setInt(1, input); //执行查询并获取结果 ResultSet resultSet pst.executeQuery(); //遍历结果集 while (resultSet ! null resultSet.next()) { //获取数据 int id resultSet.getInt(id); String name resultSet.getString(name); int age resultSet.getInt(age); //返回结果 resultFuture.complete(Arrays.asList(idid,namename,ageage)); } //关闭资源 pst.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } }); } /** * 异步IO超时处理逻辑主要避免程序出错。参数如下: * 第一个参数是输入数据 * 第二个参数是异步IO返回的结果 */ Override public void timeout(Integer input, ResultFutureString resultFuture) throws Exception { resultFuture.complete(Collections.singletonList(异步IO超时)); } //关闭资源 Override public void close() throws Exception { //关闭线程池 executorService.shutdown(); } }/** * 实现Flink异步IO方式二:线程池模拟异步客户端 * 案例读取MySQL中的数据 */ object AsyncIOTest2 { def main(args: Array[String]): Unit { val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment //为了测试效果这里设置并行度为1 env.setParallelism(1) //导入隐式转换 import org.apache.flink.streaming.api.scala._ //准备数据流 val idDS: DataStream[Int] env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) /** * 使用异步IO参数解释如下 * 第一个参数是输入数据流 * 第二个参数是异步IO的实现类 * 第三个参数是用于完成异步操作超时时间 * 第四个参数是超时时间单位 * 第五个参数可以触发的最大异步i/o操作数 */ AsyncDataStream.unorderedWait(idDS, new AsyncDatabaseRequest2(), 5000, java.util.concurrent.TimeUnit.MILLISECONDS, 10) .print() env.execute() } } class AsyncDatabaseRequest2 extends RichAsyncFunction[Int,String]() { //准备线程池对象 var executorService: ExecutorService null //初始化资源,准备线程池 override def open(parameters: Configuration): Unit { //初始化线程池,第一个参数是线程池中线程的数量第二个参数是线程池中线程的最大数量第三个参数是线程池中线程空闲的时间第四个参数是线程池中线程空闲时间的单位第五个参数是线程池中的任务队列 executorService new ThreadPoolExecutor(10,10,0L,java.util.concurrent.TimeUnit.MILLISECONDS, new java.util.concurrent.LinkedBlockingQueue[Runnable]()) } //多线程方式处理数据 override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit { //使用线程池执行异步任务 executorService.submit(new Runnable { override def run(): Unit { /** * 以下两个方法不能设置在open方法中因为多线程共用数据库连接和pst对象这样会导致线程不安全 */ val conn: Connection DriverManager.getConnection(jdbc:mysql://node2:3306/mydb?useSSLfalse, root, 123456) val pst: PreparedStatement conn.prepareStatement(select id,name,age from async_tbl where id ?) //设置参数 pst.setInt(1, input) //执行查询并获取结果 val rs pst.executeQuery() while(rs!null rs.next()){ val id: Int rs.getInt(id) val name: String rs.getString(name) val age: Int rs.getInt(age) //将结果返回给Flink resultFuture.complete(List(id id,name name,age age)) } //关闭资源 pst.close(); conn.close(); } }) } /** * 异步IO超时处理逻辑主要避免程序出错。参数如下: * 第一个参数是输入数据 * 第二个参数是异步IO返回的结果 */ override def timeout(input: Int, resultFuture: ResultFuture[String]): Unit { resultFuture.complete(List(异步IO超时了)) } //关闭资源 override def close(): Unit { //关闭线程池 executorService.shutdown() } }