深入浅出JMS(四)--ActiveMQ[消费者]实时持久化数据到HDFS

xiaoxiao2021-02-28  16

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点。

第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点。

第三篇博文深入浅出JMS(三)–ActiveMQ简单的HelloWorld实例,我们实现了一种点对点的同步消息模型,并没有给大家呈现发布订阅模型。

前言

功能介绍:使用ActiveMQ接收爬虫端实时消息,实时持久化到HDFS ; [ 在实际项目中,实时持久化到HDFS会存在问题,将在另一篇文章中详细介绍:HDFS文件写入FSDataOutputStream中的持久化hsync()不起作用详解 ]

环境准备

依赖的文件:集群hadoop相关配置文件


pom.xml文件:

<dependencies> <!-- hadoop 相关maven依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.5.1</version> </dependency> <!-- activemq 相关maven依赖 --> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.1.0</version> </dependency> </dependencies>

项目代码

[消息消费者] : JMSConsumer.java

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import javax.jms.*; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.EnumSet; /** * @author chenhaolin * @date Aug 30, 2017 11:11:36 AM * @parameter 消息的消费者(接受者) * "adtime","adtime8888!@#","tcp://192.168.70.78:61616?jms.prefetchPolicy.all=100" * ec_tmall_item */ public class JMSConsumer { private static final String NAME = "adtime"; //连接用户名 private static final String PWD = "adtime8888!@#"; //连接密码 private static final String URL = "tcp://192.168.70.78:61616?jms.prefetchPolicy.all=100"; //连接地址 public static Configuration conf = new Configuration(); public static FileSystem fileSystem; public static void main(String[] args) throws JMSException, IOException, InterruptedException { ConnectionFactory factory; //连接工厂 Connection connection = null; //连接 Session session; //会话 接受或者发送消息的线程 Destination destination; //消息的目的地 MessageConsumer consumer; //消费者 factory = new ActiveMQConnectionFactory(NAME, PWD, URL); //实例化工厂 connection = factory.createConnection(); //通过链接工厂获得连接 connection.start(); //启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建会话 destination = session.createQueue("ec_tmall_item"); //创建一个连接 ec_tmall_item 的消息队列 consumer = session.createConsumer(destination); //创建消息消费者 fileSystem = FileSystem.get(conf); //初始化Hadoop文件系统 Path path = null; // HDFS输出路径 FSDataOutputStream outputStream = null; //HDFS输出流 while (true) { /** * 初始化输出路径 */ Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH"); String res = sdf.format(date); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmm"); String res2 = sdf2.format(date); String filename = res2.substring(10, 11); String pathStr = "/group/user/chenhaolin/meta/hive-temp-table/chenhaolin.db/aq/pt_date=" + res + "/00000_" + filename; path = new Path(pathStr); //初始化输出流 if (!fileSystem.exists(path)) { outputStream = fileSystem.create(path); } //接受每一条消息 TextMessage textMessage = (TextMessage) consumer.receive(); //对消息处理 String result = textMessage.getText() + "\n"; //输出 outputStream.write(result.getBytes("UTF-8")); outputStream.hsync(); //实时持久化处理 DFSOutputStream dfsOutputStream = (DFSOutputStream) outputStream.getWrappedStream(); EnumSet<HdfsDataOutputStream.SyncFlag> syncFlags = EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH); dfsOutputStream.hsync(syncFlags); } } }

总结

实际项目中主要遇到的问题是,接收数据存在HDFS上的持久化问题;其中输出流FSDataOuputStream 自带的 hflush,hsync 方法均不能实现实时持久化数据到HDFS,只有在close的时候才能完全持久化,但是这又不符合业务场景,不间断实时接收持久化数据。将在另一篇文章中做详细介绍 : HDFS文件写入FSDataOutputStream中的持久化hsync()不起作用详解

转载请注明原文地址: https://www.6miu.com/read-200083.html

最新回复(0)