From 2a04a6774dac55c7b7d0e4eb5a56c97ee4ed0164 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Tue, 6 Aug 2024 00:46:18 +0800 Subject: [PATCH 1/6] 1 --- .../src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java index e85b5bb..2da00c0 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java @@ -15,7 +15,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; /** - * @author Felix + * @author MeiYifan * @date 2023/5/15 * ODS数据的采集 */ -- Gitee From 27b6cc9e0e70b765b97ecec86af42564b7f63a18 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Tue, 6 Aug 2024 00:50:02 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/atguigu/tms/realtime/app/func/DimAsyncFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/func/DimAsyncFunction.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/func/DimAsyncFunction.java index 1523325..2c2228a 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/func/DimAsyncFunction.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/func/DimAsyncFunction.java @@ -14,7 +14,7 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; /** - * @author Felix + * @author MeiYifan * @date 2023/5/30 * 发送异步请求进行维度关联 * 模板方法设计模式: -- Gitee From d6988ac2d5f2a8b5009d73e0c6835787de1758b6 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Wed, 7 Aug 2024 10:18:58 +0800 Subject: [PATCH 3/6] =?UTF-8?q?CreateEnvUtil.java=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=EF=BC=8C=E8=AF=A5=E5=BA=93=E5=90=8D=EF=BC=8C=E5=8F=96=E6=B6=88?= =?UTF-8?q?ck=E5=9C=B0=E5=9D=80=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tms/realtime/utils/CreateEnvUtil.java | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java index d2ee3a9..f82f09e 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java @@ -42,18 +42,18 @@ public class CreateEnvUtil { env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1),Time.seconds(3))); //2.6 设置状态后端 env.setStateBackend(new HashMapStateBackend()); - env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck"); +// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck"); //2.7 设置操作hdfs的用户 //获取命令行参数 ParameterTool parameterTool = ParameterTool.fromArgs(args); - String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu"); - System.setProperty("HADOOP_USER_NAME",hdfsUserName); +// String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu"); +// System.setProperty("HADOOP_USER_NAME",hdfsUserName); return env; } //获取MySqlSource public static MySqlSource getMySqlSource(String option,String serverId,String[] args){ ParameterTool parameterTool = ParameterTool.fromArgs(args); - String mysqlHostname = parameterTool.get("mysql-hostname", "hadoop102"); + String mysqlHostname = parameterTool.get("mysql-hostname", "localhost"); int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306")); String mysqlUsername = parameterTool.get("mysql-username", "root"); String mysqlPasswd = parameterTool.get("mysql-passwd", "123456"); @@ -75,47 +75,51 @@ public class CreateEnvUtil { .password(mysqlPasswd) .deserializer(jsonDebeziumDeserializationSchema); + String dwddatabase = "imrds_uat"; + String dimdatabase = "tms_config"; + + //tms01 switch (option){ //读取事实数据 case "dwd": - String[] dwdTables = new String[]{"tms.order_info", - "tms.order_cargo", - "tms.transport_task", - "tms.order_org_bound"}; + String[] dwdTables = new String[]{dwddatabase+".order_info", + dwddatabase+".order_cargo", + dwddatabase+".transport_task", + dwddatabase+".order_org_bound"}; return builder - .databaseList("tms") + .databaseList(dwddatabase) .tableList(dwdTables) - .startupOptions(StartupOptions.latest()) + .startupOptions(StartupOptions.initial()) .serverId(serverId) .build(); //读取维度数据 case "realtime_dim": - String[] realtimeDimTables = new String[]{"tms.user_info", - "tms.user_address", - "tms.base_complex", - "tms.base_dic", - "tms.base_region_info", - "tms.base_organ", - "tms.express_courier", - "tms.express_courier_complex", - "tms.employee_info", - "tms.line_base_shift", - "tms.line_base_info", - "tms.truck_driver", - "tms.truck_info", - "tms.truck_model", - "tms.truck_team"}; + String[] realtimeDimTables = new String[]{dwddatabase+".user_info", + dwddatabase+".user_address", + dwddatabase+".base_complex", + dwddatabase+".base_dic", + dwddatabase+".base_region_info", + dwddatabase+".base_organ", + dwddatabase+".express_courier", + dwddatabase+".express_courier_complex", + dwddatabase+".employee_info", + dwddatabase+".line_base_shift", + dwddatabase+".line_base_info", + dwddatabase+".truck_driver", + dwddatabase+".truck_info", + dwddatabase+".truck_model", + dwddatabase+".truck_team"}; return builder - .databaseList("tms") + .databaseList(dwddatabase) .tableList(realtimeDimTables) .startupOptions(StartupOptions.initial()) .serverId(serverId) .build(); case "config_dim": return builder - .databaseList("tms_config") - .tableList("tms_config.tms_config_dim") + .databaseList(dimdatabase) + .tableList(dimdatabase+".tms_config_dim") .startupOptions(StartupOptions.initial()) .serverId(serverId) .build(); -- Gitee From 92710777187559336bf218e810d9daed890e94d7 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Wed, 7 Aug 2024 10:32:16 +0800 Subject: [PATCH 4/6] =?UTF-8?q?pom=E4=BF=AE=E6=94=B9=EF=BC=8Cflink?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E6=94=B9=E4=B8=BA16.2=20createDependencyRedu?= =?UTF-8?q?cedPom=20=E4=B8=BA=20false?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tms-realtime/pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tms-realtime/pom.xml b/tms-realtime/pom.xml index 278929a..baa341f 100644 --- a/tms-realtime/pom.xml +++ b/tms-realtime/pom.xml @@ -14,7 +14,8 @@ 8 UTF-8 1.8 - 1.17.0 + + 1.16.2 3.3.4 2.3.0 @@ -198,6 +199,7 @@ + false -- Gitee From 119fa6173167f622ac09bd24dffa08ebb819e008 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Wed, 7 Aug 2024 21:44:21 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../atguigu/tms/realtime/app/ods/OdsApp.java | 8 +- .../tms/realtime/utils/CreateEnvUtil.java | 94 +++++++++---------- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java index 2da00c0..6e0414c 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/app/ods/OdsApp.java @@ -23,7 +23,7 @@ public class OdsApp { public static void main(String[] args) throws Exception { //TODO 1.获取流处理环境并指定检查点 StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args); - env.setParallelism(4); + env.setParallelism(2); //TODO 2.使用FlinkCDC从MySQL中读取数据-事实数据 String dwdOption = "dwd"; @@ -46,7 +46,7 @@ public class OdsApp { SingleOutputStreamOperator strDS = env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), sourceName) - .setParallelism(1) +// .setParallelism(1) .uid(option + sourceName); //2.简单的ETL @@ -69,7 +69,9 @@ public class OdsApp { } } } - ).setParallelism(1); + ) +// .setParallelism(1) + ; //3.按照主键进行分组,避免出现乱序 KeyedStream keyedDS = processDS.keyBy( new KeySelector() { diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java index f82f09e..47a83fc 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java @@ -24,7 +24,7 @@ import java.util.HashMap; */ public class CreateEnvUtil { //获取流处理环境 - public static StreamExecutionEnvironment getStreamEnv(String[] args){ + public static StreamExecutionEnvironment getStreamEnv(String[] args) { //TODO 1.基本环境准备 //1.1 指定流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -39,92 +39,90 @@ public class CreateEnvUtil { //2.4 设置两个检查点之间的最小时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L); //2.5 设置重启策略 - env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1),Time.seconds(3))); + env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3))); //2.6 设置状态后端 env.setStateBackend(new HashMapStateBackend()); // env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck"); //2.7 设置操作hdfs的用户 - //获取命令行参数 - ParameterTool parameterTool = ParameterTool.fromArgs(args); + // String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu"); // System.setProperty("HADOOP_USER_NAME",hdfsUserName); return env; } //获取MySqlSource - public static MySqlSource getMySqlSource(String option,String serverId,String[] args){ + public static MySqlSource getMySqlSource(String option, String serverId, String[] args) { ParameterTool parameterTool = ParameterTool.fromArgs(args); String mysqlHostname = parameterTool.get("mysql-hostname", "localhost"); int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306")); String mysqlUsername = parameterTool.get("mysql-username", "root"); String mysqlPasswd = parameterTool.get("mysql-passwd", "123456"); option = parameterTool.get("start-up-options", option); - serverId = parameterTool.get("server-id",serverId); + serverId = parameterTool.get("server-id", serverId); // 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中 HashMap config = new HashMap<>(); config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()); // 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化 JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema = - new JsonDebeziumDeserializationSchema(false, config); + new JsonDebeziumDeserializationSchema(false, config); MySqlSourceBuilder builder = MySqlSource.builder() - .hostname(mysqlHostname) - .port(mysqlPort) - .username(mysqlUsername) - .password(mysqlPasswd) - .deserializer(jsonDebeziumDeserializationSchema); + .hostname(mysqlHostname) + .port(mysqlPort) + .username(mysqlUsername) + .password(mysqlPasswd) + .deserializer(jsonDebeziumDeserializationSchema); String dwddatabase = "imrds_uat"; String dimdatabase = "tms_config"; //tms01 - switch (option){ + switch (option) { //读取事实数据 case "dwd": - String[] dwdTables = new String[]{dwddatabase+".order_info", - dwddatabase+".order_cargo", - dwddatabase+".transport_task", - dwddatabase+".order_org_bound"}; + String[] dwdTables = new String[]{dwddatabase + ".order_info", + dwddatabase + ".order_cargo", + dwddatabase + ".transport_task", + dwddatabase + ".order_org_bound"}; return builder - .databaseList(dwddatabase) - .tableList(dwdTables) - .startupOptions(StartupOptions.initial()) - .serverId(serverId) - .build(); + .databaseList(dwddatabase) + .tableList(dwdTables) + .startupOptions(StartupOptions.initial()) + .serverId(serverId) + .build(); //读取维度数据 case "realtime_dim": - String[] realtimeDimTables = new String[]{dwddatabase+".user_info", - dwddatabase+".user_address", - dwddatabase+".base_complex", - dwddatabase+".base_dic", - dwddatabase+".base_region_info", - dwddatabase+".base_organ", - dwddatabase+".express_courier", - dwddatabase+".express_courier_complex", - dwddatabase+".employee_info", - dwddatabase+".line_base_shift", - dwddatabase+".line_base_info", - dwddatabase+".truck_driver", - dwddatabase+".truck_info", - dwddatabase+".truck_model", - dwddatabase+".truck_team"}; + String[] realtimeDimTables = new String[]{dwddatabase + ".user_info", + dwddatabase + ".user_address", + dwddatabase + ".base_complex", + dwddatabase + ".base_dic", + dwddatabase + ".base_region_info", + dwddatabase + ".base_organ", + dwddatabase + ".express_courier", + dwddatabase + ".express_courier_complex", + dwddatabase + ".employee_info", + dwddatabase + ".line_base_shift", + dwddatabase + ".line_base_info", + dwddatabase + ".truck_driver", + dwddatabase + ".truck_info", + dwddatabase + ".truck_model", + dwddatabase + ".truck_team"}; return builder - .databaseList(dwddatabase) - .tableList(realtimeDimTables) - .startupOptions(StartupOptions.initial()) - .serverId(serverId) - .build(); + .databaseList(dwddatabase) + .tableList(realtimeDimTables) + .startupOptions(StartupOptions.initial()) + .serverId(serverId) + .build(); case "config_dim": return builder - .databaseList(dimdatabase) - .tableList(dimdatabase+".tms_config_dim") - .startupOptions(StartupOptions.initial()) - .serverId(serverId) - .build(); + .databaseList(dimdatabase) + .tableList(dimdatabase + ".tms_config_dim") + .startupOptions(StartupOptions.initial()) + .serverId(serverId) + .build(); } - Log.error("不支持的操作类型!"); return null; } -- Gitee From f0b638b2b94f644b5374698aa1825f06d7fa6b66 Mon Sep 17 00:00:00 2001 From: meiyifan Date: Wed, 7 Aug 2024 22:10:16 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java index 47a83fc..ee23166 100644 --- a/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java +++ b/tms-realtime/src/main/java/com/atguigu/tms/realtime/utils/CreateEnvUtil.java @@ -66,7 +66,6 @@ public class CreateEnvUtil { JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema = new JsonDebeziumDeserializationSchema(false, config); - MySqlSourceBuilder builder = MySqlSource.builder() .hostname(mysqlHostname) .port(mysqlPort) @@ -74,9 +73,8 @@ public class CreateEnvUtil { .password(mysqlPasswd) .deserializer(jsonDebeziumDeserializationSchema); - String dwddatabase = "imrds_uat"; + String dwddatabase = "tms01"; String dimdatabase = "tms_config"; - //tms01 switch (option) { @@ -89,7 +87,7 @@ public class CreateEnvUtil { return builder .databaseList(dwddatabase) .tableList(dwdTables) - .startupOptions(StartupOptions.initial()) + .startupOptions(StartupOptions.latest()) .serverId(serverId) .build(); //读取维度数据 -- Gitee