Streamsql&&Kafka demo

xiaoxiao2021-02-28  67

一、建Kafka数据源 1. 登陆您集群中的一台安装了Kafka的节点(node4,192.168.100.54)(zookeeper 192.168.100.51-53)。进入/usr/lib/kafka/bin目录,该目录下有建Kafka数据源的 所需要的一些脚本。 1. 建一个Kafka Topic 执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-create-topic.sh脚本: ./kafka-create-topic.sh --partition 3 --topic demo --zookeeper 192.168.100.51:2181 --broker 192.168.100.54:9092 2. 查看Kafka Topic 执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-list-topic.sh脚本 ./kafka-list-topic.sh --zookeeper 192.168.100.51:2181 3. 建Kafka producer并发布消息 执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-console-producer.sh脚本: ./kafka-console-producer.sh --broker-list 192.168.100.54:9092 --topic demo(为kafka节点) 这里我们指定了使用172.16.1.128节点为Kafka broker(详细介绍见后文),并且指定了producer发布 消息的topic为demo。现在,我们可以在命令行中输入一些消息,这些消息都将被发布给demo 1. 登陆您集群中的任意一个节点,连接到Inceptor。这里,我们以hive用户身份连接一个LDAP认证 的Inceptor Server 2。 beeline -u "jdbc:hive2://localhost:10000/default" -n hive -p 123456 2. 建一个Stream CREATE STREAM demo_stream(id INT, letter STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES("topic"="demo","kafka.zookeeper"="192.168.100.51:2181"); 3.查看Stream SHOW STREAMS; 我们可以看到刚刚建好的demo_stream。 4. 创建并触发一个StreamJob a. 建一张新表demo_table,它的需要和demo_stream有相同的schema: CREATE TABLE demo_table(id INT, letter STRING); b. 向demo_table插入demo_stream中的数据,这个操作会触发StreamJob的执行: INSERT INTO demo_table SELECT * FROM demo_stream; 5. 列出正在运行的StreamJob 执行下面指令: LIST STREAMJOBS; 我们可以看到下面输出: 输出中包含streamid,触发StreamJob的sql和status。 6..在Inceptor管理界面4040查看Streamjob运行状态 可以看到在“Active Stages”下有正在运行的demo_stream。 7. 此时demo_stream已经开始接收发布到之前创建的demo的消息。需要注意的是,demo_stream对发布 到demo的消息的接收是从streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发开始的,也就 是说从执行INSERT开始的,在执行INSERT之前发布到demo的消息都不会被demo_stream接收。所以,目 前demo_table中没有任何记录: SELECT * FROM demo_table; 二、接收并处理Kafka传来的数据 1. 切换到正在运行向demo发布数据的Kafka producer的页面: ./kafka-console-producer.sh --broker-list 192.168.100.54:9092 --topic demo hello world 由于“hello”,“world”都是在streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发前发布的,这两条消息都不会被demo_stream接收。 (192.168.100.54为kafka集群的一个节点) 6 | 2. 快速入门 2. 在命令行中输入一些数据。由于我们已经规定了demo_stream接收消息的类型是由“,”分隔的两列文 本,我们需要输入这个类型的消息,以便demo_stream处理。输入: 3. 切换回到Inceptor命令行的窗口,现在查看demo_table中的数据,我们可以看到demo_table中有我们刚 才发布的四条消息: SELECT * FROM demo_table; 4. 现在我们可以在demo_table上进行一些查询: SELECT COUNT(*) FROM demo_table GROUP BY letter; 三、停止Streamjob 演示到此结束,用下面指令可以停止streamid为29008ed34b9e45bca784362948b88a85的streamjob: STOP STREAMJOB 29008ed34b9e45bca784362948b88a85; StreamSQL的优势 相对于采用编程(Scala/Java)的方式开发流应用,采用StreamSQL具有以下优势: 微批模式和事件驱动模式的一体化 在同一套系统里,用户可以根据业务需求,灵活切换微批模式的流处理和事件驱动模式的流处理。 极高的易用性 而使用StreamSQL,用户只需要有编写普通SQL的经验,就可以写出高效、安全、稳定的流处理应用。 性能提升 在一些条件下,采用StreamSQL的方式甚至比编程方式获得更高的性能提升。这是因为StreamSQL做了一些特殊优化,在编程模式下无法轻易实现。 产品化程度高 通过编程的方式来实现流处理的另一个问题是产品化程度非常低。由于编程有较高的自由度,出现问题的可能性很大;而又由于编程的方式将流处理平台和用户程序绑定在一起,用户没办法及时分析出出错问题的root cause。SQL作为一个通用的接口将大大地提高产品化程度。 迁移成本低 用户原有的很多业务逻辑是通过SQL实现,如果通过编程的方式迁移到流上,迁移成本非常高,难以保证迁移后逻辑的正确性。而一旦采用StreamSQL,用户只需要修改少量SQL,迁移成本几乎接近零。 由于具备上述几项优点,Inceptor StreamSQL将会在建立流式应用时,表现出其强大的业务应对能力和易用性。用户将发现流式分析的实现过程也可以很便捷。
转载请注明原文地址: https://www.6miu.com/read-27274.html

最新回复(0)