开发测试过程中在本机安装了 Kafka Server,记录一下启动步骤和生产消费的代码。
安装配置
- Mac OS 环境并安装 Homebrew
- 安装 Zookeeper 和 Kafka
1 2
| brew install zookeeper brew install kafka
|
brew 指定 install 版本,请参考:Homebrew 指定版本安装
- 修改 Kafka 配置文件
1
| vim /usr/local/etc/kafka/server.properties
|
找到 #listeners=PLAINTEXT://:9092
打开注释并修改为 listeners=PLAINTEXT://localhost:9092
启动 Zookeeper
1
| zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
|
启动 Kafka Server
1
| kafka-server-start /usr/local/etc/kafka/server.properties
|
- 这里 Zookeeper 和 Kafka 都是前台临时启动
- 停止服务也请以先 Kafka Server 后 Zookeeper 的顺序关闭
创建 Kafka Topic
1
| kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test_r1p1
|
Kafka Topic 命名规则参考:Kafka Topic 命名技巧
启动指定 Topic Producer
1
| kafka-console-producer --broker-list localhost:9092 --topic topic_test_r1p1
|
Consumer 端测试
1 2 3
| ➜ ~ kafka-console-producer --broker-list localhost:9092 --topic topic_test_r1p1 >test123 >测试数据
|
- Kafka Tools
下载 kafka tools 客户端工具可查看生产的数据。
pom.xml 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <properties> <flink.version>1.9.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.4</scala.version> </properties>
<dependencies>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.70</version> </dependency>
</dependencies>
|
Flink 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package test.demo
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
object Kafka2FlinkDemo {
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "1") properties.setProperty("enable.auto.commit", "true") properties.setProperty("auto.commit.interval.ms", "6000")
val topics = "topic_test_r1p1"
val kafkaConsumer = new FlinkKafkaConsumer010( topics, new SimpleStringSchema(), properties ).setStartFromLatest()
env .addSource(kafkaConsumer) .map(r => { try { println(r) } catch { case e1: Exception => println(e1.getMessage) } })
env.execute("kafkaConsumer Print")
} }
|
在控制台看到消费内容,即表示已成功消费。