文章目录1. flink安装部署和启动1.1 下载flink安装包并解压到文件目录下1.2 进入解压后的flink安装目录下的bin目录启动flink服务2. flink-sql写入paimon数据2.1 导入相关包至flink安装目录下的lib目录2.2 进入flink安装目录下的bin目录启动flink的client客户端2.3. 创建paimon的catalog并使用2.4. 创建数据库并使用2.5 创建简单表并插入3条数据3. java代码读取paimon数据1. flink安装部署和启动1.1 下载flink安装包并解压到文件目录下根据需要下载flink安装包链接 flinktar -zxvf flink-1.19.1-bin-scala_2.12.tgz1.2 进入解压后的flink安装目录下的bin目录启动flink服务cd bin cd ./start-cluster.sh2. flink-sql写入paimon数据2.1 导入相关包至flink安装目录下的lib目录1 存储本地文件系统若paimon数据存储于本地文件系统则导入paimon-flink包如paimon-flink-1.19-1.0.1.jar包版本依据实际需要导入。2 存储HDFS文件系统若paimon数据存储于HDFS文件系统则需导入hadoop相关的包hadoop-hdfs-3.3.6.jar、hadoop-client-3.3.6.jar、hadoop-common-3.3.6.jar、hadoop-client-runtime-3.3.4.jar、hadoop-client-api-3.3.5.jar。2.2 进入flink安装目录下的bin目录启动flink的client客户端./sql-client.sh2.3. 创建paimon的catalog并使用若存储在服务器文件系统上将warehouse的值改为文件夹路径即可如/home/paimondb/此处写入的是HDFS文件系统。createcatalog paimonCatalogwith(typepaimon,warehousehdfs://localhost:9000/paimon/warehouse);创建成功后当前会话中会包含两个catalog一个是默认的default_catalog另一个是自己创建成功的paimonCatalog。2.4. 创建数据库并使用CREATEDATABASEIFNOTEXISTStestdb;USEtestdb;同catalog一样新建的catalog下会生成一个默认的数据库default通过【show databases;】命令可以查看创建的表也可也通过系统文件或HDFS文件系统中查看数据库格式为【/paimon/warehouse/default.db】和【/paimon/warehouse/testdb.db】。2.5 创建简单表并插入3条数据CREATETABLEuser_table(user_idBIGINT,user_name STRING,email STRING,register_timeTIMESTAMP(3),idCard STRING,PRIMARYKEY(user_id)NOTENFORCED);INSERTINTOuser_tableVALUES(1,test,123qq.com,TIMESTAMP2025-01-15 00:00:00,440521199703011234);INSERTINTOuser_tableVALUES(2,test2,123qq.com,TIMESTAMP2026-01-15 00:00:00,440521199703011234),(3,测试3,123qq.com,TIMESTAMP2026-01-15 00:00:00,440521199703011234);默认流式展示查询结果可通过如下配置变换查询结果的模式setsql-client.execution.result-modetableau;resetexecution.checkpoionting.interval;setexecution.runtime-modebatch;select*fromuser_table;3. java代码读取paimon数据publicstaticvoidmain(String[]args){StringcatalogNamepaimon_catalog;StringwarehousePathhdfs://localhost:9000/paimon/warehouse;org.apache.hadoop.conf.ConfigurationhadoopConfneworg.apache.hadoop.conf.Configuration();hadoopConf.set(fs.defaultFS,hdfs://localhost:9000);ConfigurationflinkConfnewConfiguration();// 构建TableEnvironment, 配置读取方式为 批式.inBatchMode()/流式.inStreamingMode()EnvironmentSettings.BuildersettingsBuilderEnvironmentSettings.newInstance().inBatchMode();// 应用自定义配置EnvironmentSettingssettingssettingsBuilder.withConfiguration(flinkConf).build();// 创建TableEnvironmentTableEnvironmenttEnvTableEnvironment.create(settings);System.out.println(1. tEnv 创建成功);// 1. 创建初始 Catalog无权限控制StringcreateCatalogSQLString.format(CREATE CATALOG %s WITH (type paimon, warehouse %s),catalogName,warehousePath);tEnv.executeSql(createCatalogSQL);System.out.println(2. catalog 创建成功);// 切换到创建的catalog下tEnv.useCatalog(catalogName);// 获取所有的数据库String[]databaseListtEnv.listDatabases();for(Stringdatabase:databaseList){// 一般不会使用默认数据库所以可以跳过若有需要可删除此跳过语句if(database.equals(default)){continue;}// 切换数据库tEnv.useDatabase(database);System.out.println(3. use database: database);String[]tableListtEnv.listTables();for(Stringtable:tableList){System.out.println( table: table );StringtestSQLString.format(SELECT * FROM %s ,table);tEnv.executeSql(testSQL).print();System.out.println( );}System.out.println(4. use database: database 结束);}}