1.flume版本:1.8
2.配置文件:
agent.sinks.csvSink.type = com.xxx.xxx.RollingFileSink agent.sinks.csvSink.sink.directory = /data #此处配置不使用flume默认的滚动 agent.sinks.csvSink.sink.rollInterval = 0 agent.sinks.csvSink.batchSize = 100003.pom配置:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.8.0</version> </dependency> </dependencies>4.代码:
import java.io.*; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.PathManager; import org.apache.flume.formatter.output.PathManagerFactory; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.serialization.EventSerializer; import org.apache.flume.serialization.EventSerializerFactory; public class RollingFileSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory .getLogger(RollingFileSink.class); private static final long defaultRollInterval = 30; private static final int defaultBatchSize = 100; private int batchSize = defaultBatchSize; private File directory; private long rollInterval; private OutputStream outputStream; private ScheduledExecutorService rollService; private String serializerType; private Context serializerContext; private SimpleDateFormat yearFormat; private SimpleDateFormat monthFormat; private SimpleDateFormat dayFormat; private SimpleDateFormat hourFormat; private EventSerializer serializer; private SinkCounter sinkCounter; private PathManager pathController; private String dir; private volatile boolean shouldRotate; public RollingFileSink() { shouldRotate = false; } @Override public void configure(Context context) { String pathManagerType = context.getString("sink.pathManager", "DEFAULT"); dir = context.getString("sink.directory"); String rollInterval = context.getString("sink.rollInterval"); serializerType = context.getString("sink.serializer", "TEXT"); serializerContext = new Context(context.getSubProperties("sink." + EventSerializer.CTX_PREFIX)); Context pathManagerContext = new Context(context.getSubProperties("sink." + PathManager.CTX_PREFIX)); pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext); Preconditions.checkArgument(dir != null, "Directory may not be null"); Preconditions.checkNotNull(serializerType, "Serializer type is undefined"); if (rollInterval == null) { this.rollInterval = defaultRollInterval; } else { this.rollInterval = Long.parseLong(rollInterval); } batchSize = context.getInteger("sink.batchSize", defaultBatchSize); this.directory = new File(dir); if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } //格式化时间 yearFormat = new SimpleDateFormat("yyyy"); monthFormat = new SimpleDateFormat("yyyyMM"); dayFormat = new SimpleDateFormat("yyyyMMdd"); hourFormat = new SimpleDateFormat("yyyyMMddHH"); } @Override public void start() { logger.info("Starting {}...", this); sinkCounter.start(); super.start(); pathController.setBaseDirectory(directory); if (rollInterval > 0) { rollService = Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder().setNameFormat( "rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d").build()); /* * Every N seconds, mark that it's time to rotate. We purposefully do NOT * touch anything other than the indicator flag to avoid error handling * issues (e.g. IO exceptions occuring in two different threads. * Resist the urge to actually perform rotation in a separate thread! */ rollService.scheduleAtFixedRate(new Runnable() { @Override public void run() { logger.debug("Marking time to rotate file {}", pathController.getCurrentFile()); shouldRotate = true; } }, rollInterval, rollInterval, TimeUnit.SECONDS); } else { logger.info("RollInterval is not valid, file rolling will not happen."); } logger.info("RollingFileSink {} started.", getName()); } @Override public Status process() throws EventDeliveryException { Date date = new Date(); String year = yearFormat.format(date); String month = monthFormat.format(date); String day = dayFormat.format(date); String hour = hourFormat.format(date); //创建文件夹 String dirName = dir + File.separator + year + File.separator + month + File.separator + day; File file = new File(dirName); if (!file.exists()) { file.mkdirs(); logger.info("create dir" + dirName); } //创建文件 File currentFile = new File(dirName + File.separator + "data_" + hour + ".csv"); if (!currentFile.exists()) { try { currentFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream == null) { logger.debug("Opening output stream for file {}", currentFile); try { // 注意这里的true,代表append到文件中 outputStream = new BufferedOutputStream( new FileOutputStream(currentFile,true)); serializer = EventSerializerFactory.getInstance( serializerType, serializerContext, outputStream); serializer.afterCreate(); sinkCounter.incrementConnectionCreatedCount(); } catch (IOException e) { sinkCounter.incrementConnectionFailedCount(); throw new EventDeliveryException("Failed to open file " + currentFile + " while delivering event", e); } } Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Status result = Status.READY; try { transaction.begin(); int eventAttemptCounter = 0; for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { sinkCounter.incrementEventDrainAttemptCount(); eventAttemptCounter++; serializer.write(event); } else { // No events found, request back-off semantics from runner result = Status.BACKOFF; break; } } serializer.flush(); outputStream.flush(); transaction.commit(); sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter); // 关闭流,时间到了的时候会写入另一个文件,否则会一直写入一个文件。 if (outputStream != null) { logger.debug("Closing file {}", currentFile); try { serializer.beforeClose(); outputStream.close(); sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { sinkCounter.incrementConnectionFailedCount(); throw new EventDeliveryException("Unable to rotate file " + currentFile + " while delivering event", e); } finally { serializer = null; outputStream = null; } } } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to process transaction", ex); } finally { transaction.close(); } return result; } @Override public void stop() { logger.info("RollingFile sink {} stopping...", getName()); sinkCounter.stop(); super.stop(); if (outputStream != null) { logger.debug("Closing file {}", pathController.getCurrentFile()); try { serializer.flush(); serializer.beforeClose(); outputStream.close(); sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { sinkCounter.incrementConnectionFailedCount(); logger.error("Unable to close output stream. Exception follows.", e); } finally { outputStream = null; serializer = null; } } if (rollInterval > 0) { rollService.shutdown(); while (!rollService.isTerminated()) { try { rollService.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for roll service to stop. " + "Please report this.", e); } } } logger.info("RollingFile sink {} stopped. Event metrics: {}", getName(), sinkCounter); } public File getDirectory() { return directory; } public void setDirectory(File directory) { this.directory = directory; } public long getRollInterval() { return rollInterval; } public void setRollInterval(long rollInterval) { this.rollInterval = rollInterval; } }5.打包成jar包,jar包放到flume的lib目录下