相关资源
Flink SQL
-- 1)注册 Paimon 源nCREATE CATALOG paimon_hivenWITHn (n 'type' = 'paimon',n 'warehouse' = 'hdfs://xxxxx/paimon',n 'metastore' = 'hive',n 'hive-conf-dir' = '/xxxxx/conf',n 'uri' = 'thrift://域名1:9083,thrift://域名2:9083'n );nn-- 2)声明 Kafka 源ncreate table kafkaSource (n `_timestamp` string,n `name` string,n `age` string,n `id` stringn) with (n 'connector' = 'kafka',n 'format' = 'json',n 'topic' = 'topic1234',n 'properties.bootstrap.servers' = '你的Kafka Brokers',n 'properties.group.id' = 'kafka-to-paimon',n 'scan.startup.mode' = 'latest-offset'n);nn-- 3)读取+写入PaimonnINSERT INTO paimon_hive.paimon.odm_kafka_lognSELECTn name AS `name`,n age AS `age`,n id AS `id`n FROM_UNIXTIME(CAST(CAST(`_timestamp` AS BIGINT) / 1000 AS BIGINT), 'yyyyMMdd') AS `day`nFROM kafkaSource;
Flink Table (Java)
Maven依赖
<!-- 添加Flink依赖-->n<dependency>n <groupId>org.apache.flink</groupId>n <artifactId>flink-streaming-java</artifactId>n <version>1.15.0</version>n</dependency>n<dependency>n <groupId>org.apache.flink</groupId>n <artifactId>flink-java</artifactId>n <version>1.15.0</version>n</dependency>n<dependency>n <groupId>org.apache.flink</groupId>n <artifactId>flink-connector-kafka</artifactId>n <version>1.15.0</version>n</dependency>n<!-- flink table相关类 -->n<dependency>n <groupId>org.apache.flink</groupId>n <artifactId>flink-table-api-java-bridge</artifactId>n <version>1.15.0</version>n</dependency>nn<dependency>n <groupId>org.apache.flink</groupId>n <artifactId>flink-table-common</artifactId>n <version>1.15.0</version>n</dependency>n<!-- 添加Paimon依赖-->n<dependency>n <groupId>org.apache.paimon</groupId>n <artifactId>paimon-flink-1.15</artifactId>n <version>0.5.0-incubating</version>n</dependency>
Job类
package job;nnimport com.google.protobuf.ByteString;nimport function.GalaxyToPaimonFlatMap;nimport org.apache.flink.api.common.eventtime.WatermarkStrategy;nimport org.apache.flink.api.common.restartstrategy.RestartStrategies;nimport org.apache.flink.api.common.typeinfo.Types;nimport org.apache.flink.api.java.utils.ParameterTool;nimport org.apache.flink.connector.kafka.source.KafkaSource;nimport org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;nimport org.apache.flink.streaming.api.CheckpointingMode;nimport org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;nimport org.apache.flink.streaming.api.environment.CheckpointConfig;nimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;nimport org.apache.flink.table.api.DataTypes;nimport org.apache.flink.table.api.Schema;nimport org.apache.flink.table.api.bridge.java.StreamTableEnvironment;nimport org.apache.flink.types.Row;nimport org.slf4j.Logger;nimport org.slf4j.LoggerFactory;nnimport java.util.Properties;nn/**n * @Author zhangjinken * @Create 2023/12/25 17:02n * @Description 将银河PB格式日志写入到Paimonn * @Wiki -n * @Modifier -n * @ModificationTime -n * @Node -n */nnpublic class GalaxyToPaimonJob {n private static final Logger LOG = LoggerFactory.getLogger(GalaxyToPaimonJob.class);n private static final String GROUP_ID = "job.GalaxyToPaimonJob";nn public static void main(String[] args) {n try {n ParameterTool tool = ParameterTool.fromArgs(args);n int source = tool.getInt("source");n int flatmap = tool.getInt("flatmap");nn // Kafka consumern Properties galaxyPro = new Properties();n properties.setProperty("bootstrap.servers", bootstrap_servers);n properties.setProperty("group.id", groupId);n // 自动检测topic分区变化时间间隔n properties.put("flink.partition-discovery.interval-millis", "60000");n properties.put("refresh.leader.backoff.ms", 6000);n n KafkaSource<ByteString> galaxyKafkaSource = KafkaSource.<ByteString>builder().setTopics(PropertyUtil.get("user_event_etl_topic")).setValueOnlyDeserializer(new ByteStringSchema()).setProperties(galaxyPro).setStartingOffsets(OffsetsInitializer.latest()).build();nn /** 1、 创建flink流式执行环境 */n final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();n env.enableCheckpointing(120000L, CheckpointingMode.EXACTLY_ONCE);n env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000L);n env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);n env.getConfig().setAutoWatermarkInterval(0);n env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(200, 60000 * 2L));n env.setParallelism(32);n /** 2、 添加 用户+事件 Source 源 */n SingleOutputStreamOperator<Row> rsoso = env.fromSource(galaxyKafkaSource, WatermarkStrategy.noWatermarks(), "GalaxyToPaimonSource")n .uid("GalaxyToPaimonSource_Uid")n .name("GalaxyToPaimonSource_Name")n .setParallelism(source)n /** 3、 简单取出字段,下发GalaxyEntity对象 */n .flatMap(new GalaxyToPaimonFlatMap())n .uid("GalaxyToPaimonFlatMapFunction_Uid")n .name("GalaxyToPaimonFlatMapFunction_Name")n .setParallelism(flatmap)n .returns(Types.ROW_NAMED(n new String[]{"realtime", "ip", "session_id", "app_id", "device_uuid""day", "hour"},n Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING));nn /** 4、创建flink table执行环境 */n StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);n Schema schema = Schema.newBuilder()n .column("realtime", DataTypes.STRING())n .column("ip", DataTypes.STRING())n .column("session_id", DataTypes.STRING())n .column("app_id", DataTypes.STRING())n .column("device_uuid", DataTypes.STRING())n .column("day", DataTypes.STRING())n .column("hour", DataTypes.STRING())n .build();nn /** 5、创建 Paimon catalog */n tableEnv.executeSql("CREATE CATALOG paimon_hive WITH ('type' = 'paimon', 'warehouse'='hdfs://xxxxx/paimon')");n tableEnv.executeSql("USE CATALOG paimon_hive");nn /** 6、将流表注册为一个临时视图 */n tableEnv.createTemporaryView("odm_event_realtime_view", rsoso, schema);nn /** 7、将数据插入到 Paimon 表中 */n tableEnv.executeSql("INSERT INTO paimon.odm_event_realtime SELECT * FROM odm_event_realtime_view");n env.execute("job.GalaxyToPaimonJob");n } catch (Exception e) {n LOG.error("GalaxyToPaimonJob启动失败!", e);n }n }n}
Function类
package function;nnimport com.google.protobuf.ByteString;nimport org.apache.flink.api.common.functions.RichFlatMapFunction;nimport org.apache.flink.types.Row;nimport org.apache.flink.util.Collector;nimport org.slf4j.Logger;nimport org.slf4j.LoggerFactory;nnimport java.time.LocalDateTime;nimport java.time.format.DateTimeFormatter;nnpublic class GalaxyToPaimonFlatMap extends RichFlatMapFunction<ByteString, Row> {n private static final Logger log = LoggerFactory.getLogger(GalaxyToPaimonFlatMap.class);n private static final DateTimeFormatter inputDateFormat = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");n private static final DateTimeFormatter outputDateFormat = DateTimeFormatter.ofPattern("yyyyMMdd");n private static final DateTimeFormatter outputHourFormat = DateTimeFormatter.ofPattern("yyyyMMddHH");nn @Overriden public void flatMap(ByteString bytes, Collector<Row> out) {n try {n // 创建结果Rown Row row = new Row(86);nn // 使用myProtoBufObj对象依次赋值n myProtoBufObjDataToProtoBuf.myProtoBufObj myProtoBufObj = myProtoBufObjDataToProtoBuf.myProtoBufObj.parseFrom(bytes);n String realtime = myProtoBufObj.getRealtime();n row.setField(0, realtime);n row.setField(1, myProtoBufObj.getIp());n row.setField(2, myProtoBufObj.getSessionId());n row.setField(3, myProtoBufObj.getAppId());n row.setField(4, myProtoBufObj.getDeviceUuid());n row.setField(5, LocalDateTime.parse(realtime, inputDateFormat).format(outputDateFormat));n row.setField(6, LocalDateTime.parse(realtime, inputDateFormat).format(outputHourFormat));nn // 将 Row 对象输出n out.collect(row);n } catch (Exception e) {n log.error("function.GalaxyToPaimonFlatMap error is: ", e);n }n }n}
Comments NOTHING