一、Flink Sink概述
什么是SinkSink接收器是Flink数据处理流水线的末端负责将计算结果输出到外部存储系统或下游处理系统。在Flink的编程模型中Sink是DataStream API中的一个转换操作它接收DataStream并将数据写入指定的外部系统。2. Sink的分类Flink的Sink连接器可以分为以下几类内置Sink如print()、printToErr()等用于调试的内置输出文件系统Sink支持写入本地文件系统、HDFS等消息队列Sink如Kafka、RabbitMQ等数据库Sink如JDBC、Elasticsearch等自定义Sink通过实现SinkFunction接口自定义输出逻辑3. 输出语义保证Flink为Sink提供了三种输出语义保证最多一次At-most-once数据可能丢失但不会重复至少一次At-least-once数据不会丢失但可能重复精确一次Exactly-once数据既不会丢失也不会重复这些语义保证与Flink的检查点Checkpoint机制密切相关我们将在后面详细讨论。二、环境准备与依赖配置1. 版本说明Flink1.20.1JDK17Gradle8.3外部系统Kafka 3.4.0、Elasticsearch 7.17.0、MySQL 8.02. 核心依赖dependencies { // Flink核心依赖 implementation org.apache.flink:flink_core:1.20.1 implementation org.apache.flink:flink-streaming-java:1.20.1 implementation org.apache.flink:flink-clients:1.20.1 // Kafka Connector implementation org.apache.flink:flink-connector-kafka:3.4.0-1.20 // Elasticsearch Connector implementation org.apache.flink:flink-connector-elasticsearch7:3.1.0-1.20 // JDBC Connector implementation org.apache.flink:flink-connector-jdbc:3.3.0-1.20 implementation mysql:mysql-connector-java:8.0.33 // FileSystem Connector implementation org.apache.flink:flink-connector-files:1.20.1 }三、基础Sink操作1. 内置调试SinkFlink提供了一些内置的Sink用于开发和调试阶段import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class BasicSinkDemo { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 创建数据源 DataStreamString stream env.fromElements(Hello, Flink, Sink); // 打印到标准输出 stream.print(StandardOutput); // 打印到标准错误输出 stream.printToErr(ErrorOutput); // 执行作业 env.execute(Basic Sink Demo); } }2. 文件系统SinkFlink支持将数据写入本地文件系统、HDFS等。下面是一个写入本地文件系统的示例package com.cn.daimajiangxin.flink.sink; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.time.Duration; public class FileSystemSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamObject stream env.fromData(Hello, Flink, FileSystem, Sink); RollingPolicyObject, String rollingPolicy DefaultRollingPolicy.Object, Stringbuilder() .withRolloverInterval(Duration.ofMinutes(15)) .withInactivityInterval(Duration.ofMinutes(5)) .withMaxPartSize(MemorySize.ofMebiBytes(64)) .build(); // 创建文件系统Sink FileSinkObject sink FileSink .forRowFormat(new Path(file:///tmp/flink-output), new SimpleStringEncoder()) .withRollingPolicy(rollingPolicy) .build(); // 添加Sink stream.sinkTo(sink); env.execute(File System Sink Demo); } }四、高级Sink连接器1. Kafka SinkKafka是实时数据处理中常用的消息队列Flink提供了强大的Kafka Sink支持import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; public class KafkaSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 开启检查点以支持Exactly-Once语义 env.enableCheckpointing(5000); DataStreamString stream env.fromElements(Hello Kafka, Flink to Kafka, Data Pipeline); // Kafka配置 Properties props new Properties(); props.setProperty(bootstrap.servers, localhost:9092); // 创建Kafka Sink KafkaSinkString sink KafkaSink.String builder() .setKafkaProducerConfig(props) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(flink-output-topic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); // 添加Sink stream.sinkTo(sink); env.execute(Kafka Sink Demo); } }kafka消息队列消息