kafka-connect-fs 二次开发总结

xiaoxiao2025-08-14  31

   kafka-connect-fs 项目是一个开源的fluent connecor,它可以从文件系统读取文件,并加载到kafka中。它支持以下特性    

Several sort of File Systems (FS) to use.Dynamic and static URIs to ingest data from.Policies to define rules about how to look for files.File readers to parse and read different kind of file format

   支持的文件格式包括Avro、Parquet、SequenceFile、Text以及Delimited text ,并且支持Simple policy和Sleepy policy,项目具体明细可查看github:https://github.com/mmolimar/kafka-connect-fs/。

   在实际应用中,需要读取AWS S3上的gz文件,在kafka-connect-fs中未有相关FileReader实现,在整体的处理框架中,该项目采用了策略模式,根据实际应用需求,需要实现符合实际需求的FileReader和policy策略类。

   1、AwsTextFileReader

       实现从AWS s3 中读取gz的压缩文件,其中主要用到GZIPInputStream即可,即可读取压缩的文件流,但在给定一个较多文件的S3文件路径进行读取时,读取的文件达到一定时,会出现以下错误:

     原因分析:在github中有人提交这个issues:Missing reader.close ? 但是作者添加的reader.close方法并不是很全,因为文件可能包含两种内容格式,1是全部是数据,没有文件头,2 是第一行为文件头,即数据的schema信息,其余行为数据,作者的解决方式只是解决了2文件格式内容的问题,但对于1文件格式内容未完全解决,在此情况下,其实现逻辑会将reader重新赋值,但原来的reader并未销毁,文件读取到一定程度的情况下,会存在大量未关闭的文件reader,最后导致上述问题。

    解决方式:

@Override public void seek(Offset offset) { if (offset.getRecordOffset() < 0) { throw new IllegalArgumentException("Record offset must be greater than 0"); } try { if (offset.getRecordOffset() < reader.getLineNumber()) { this.reader.close(); this.reader = null; this.reader = new LineNumberReader(new InputStreamReader(new GZIPInputStream(getFs().open(getFilePath())),this.charset)); currentLine = null; } while ((currentLine = reader.readLine()) != null) { if (reader.getLineNumber() - 1 == offset.getRecordOffset()) { this.offset.setOffset(reader.getLineNumber()); return; } } this.offset.setOffset(reader.getLineNumber()); } catch (IOException ioe) { throw new ConnectException("Error seeking file " + getFilePath(), ioe); } }

    重置reader的时候将reader close并置为空即可,参照L8-9行。

 2:*Policy  策略类

     根据实际的情况编写自己的Policy类,并继承 AbstractPolicy即可。

   二次开发完成之后进行maven编译 mvn package -DskipTests 。将编译的文件同一复制到/usr/local/confluent-5.0.0/share/java/下,并在confluent worker节点到配置添加此connector , 

plugin.path=/usr/local/confluent-5.0.0/share/java/kafka-connect-fs

  

转载请注明原文地址: https://www.6miu.com/read-5034834.html

最新回复(0)