博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Flume将日志导入OSS
阅读量:6225 次
发布时间:2019-06-21

本文共 4566 字,大约阅读时间需要 15 分钟。

前言

Apache Flume是一个高可用、高可靠的分布式日志采集、聚合与传输的系统。它基于流式的数据传输,架构简单、灵活。它简单可扩展的模型,也适合在线的数据分析。

image

上图是它的简单数据流模型。Flume的数据流由Event贯穿始终,这个Event由外部的Source生成(如Web Server),携带日志数据并带有一些额外的信息。Source捕获到Event后会进行格式化,然后会推送到一个或者多个Channel中。可以认为Channel是一个Event的缓冲区,它将负责保存Event直到Sink处理完该事件。

Flume的核心运行组件是Agent,一个Agent是一个JVM,是一个完整的数据收集工具。Flume的Agent由三部分组成:Source、Channel和Sink。其中,Flume目前支持了很多种类型的Sink: HDFS Sink、HBase Sink、Thrift Sink、Avro Sink、Elastic Search Sink等。本文主要介绍如何使用HDFS Sink使Flume能够将数据导入到OSS。

配置

配置主要分三块:Java、Hadoop、Flume。配置Java比较简单,设置好JAVA_HOME即可。

Hadoop

首先需要下载Hadoop(以3.0.1版本为例)。Hadoop从2.9.1与3.0.0开始支持OSS,作为默认支持的文件系统。Hadoop的配置可以参考。主要是修改$HADOOP_HOME/ etc/hadoop/core-site.xml配置文件,并且配好fs.oss.endpoint、fs.oss.accessKeyId、fs.oss.accessKeySecret和fs.oss.impl。

Flume

下载Apache Flume 1.8.0,并且修改conf目录下的文件

mv conf/flume-conf.properties.template cpnf/flume-conf.propertiesmv conf/flume-env.sh.template conf/flume-env.sh

修改conf/flume-env.sh文件,添加JAVA_HOME与FLUME_CLASSPATH,其中,需要将HADOOP的lib与conf目录放入到FLUME_CLASSPATH里面。示例如下

export JAVA_HOME=/usr/lib/jdk1.8.0_152export FLUME_CLASSPATH=$HADOOP_HOME/etc/hadoop:$HADOOP_HOME/share/hadoop/common:$HADOOP_HOME/share/hadoop/hdfs:$HADOOP_HOME/share/hadoop/tools/lib/*

新增conf/hdfs.properties,在这个里面配置Source、Channel与Sink。假设我们想监控某个目录下新产生的文件,并把新产生的文件写到OSS里面去,我们可以这样配置:

LogAgent.sources = apacheLogAgent.channels = fileChannelLogAgent.sinks = HDFS#sources configLogAgent.sources.apache.type = spooldirLogAgent.sources.apache.spoolDir = /tmp/logsLogAgent.sources.apache.channels = fileChannelLogAgent.sources.apache.basenameHeader = trueLogAgent.sources.apache.basenameHeaderKey = fileName#sinks configLogAgent.sinks.HDFS.channel = fileChannelLogAgent.sinks.HDFS.type = hdfsLogAgent.sinks.HDFS.hdfs.path = oss://{Your Bucket Name}/logs/%Y%m%dLogAgent.sinks.HDFS.hdfs.fileType = DataStreamLogAgent.sinks.HDFS.hdfs.writeFormat = TEXTLogAgent.sinks.HDFS.hdfs.filePrefix = %{fileName}.%H:%M:%SLogAgent.sinks.HDFS.hdfs.fileSuffix = .logLogAgent.sinks.HDFS.hdfs.batchSize = 1000LogAgent.sinks.HDFS.hdfs.rollSize = 0LogAgent.sinks.HDFS.hdfs.rollCount = 0LogAgent.sinks.HDFS.hdfs.rollInterval = 30LogAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true#channels configLogAgent.channels.fileChannel.type = memoryLogAgent.channels.fileChannel.capacity = 1000000LogAgent.channels.fileChannel.transactionCapacity = 10000

注意修改 LogAgent.sinks.HDFS.hdfs.path配置项为用户自己在OSS上路径。

启动Agent

bin/flume-ng agent --conf-file  conf/hdfs.properties -c conf/ --name LogAgent -Dflume.root.logger=console

这个时候会发现抛出异常:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE    at org.apache.http.conn.ssl.SSLConnectionSocketFactory.
(SSLConnectionSocketFactory.java:144) at com.aliyun.oss.common.comm.DefaultServiceClient.createHttpClientConnectionManager(DefaultServiceClient.java:244) at com.aliyun.oss.common.comm.DefaultServiceClient.
(DefaultServiceClient.java:85) at com.aliyun.oss.OSSClient.
(OSSClient.java:209) at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:150) at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3288) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3337) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3305) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:476) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252) at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

这个异常是由于Apache Flume使用的http client版本比较低的缘故,把flume lib目录下的httpclient-4.2.1.jar和httpcore-4.4.1.jar删除,重新启动即可。

然后我们往刚才配置的LogAgent.sources.apache.spoolDir里面放文件,使用如下命令

for((i=0;i<=5;i++)); do echo "Hello, world" >> /tmp/logs/sample.data.$i; sleep 30s; done

Flume会每隔30秒钟检查这个目录,最终的效果如下:

image

参考资料

转载地址:http://chnna.baihongyu.com/

你可能感兴趣的文章
我的友情链接
查看>>
nginx在reload时候报错invalid PID number
查看>>
神经网络和深度学习-第二周神经网络基础-第二节:Logistic回归
查看>>
Myeclipse代码提示及如何设置自动提示
查看>>
c/c++中保留两位有效数字
查看>>
ElasticSearch 2 (32) - 信息聚合系列之范围限定
查看>>
VS2010远程调试C#程序
查看>>
[MicroPython]TurniBit开发板DIY自动窗帘模拟系统
查看>>
由String类的Split方法所遇到的两个问题
查看>>
Python3.4 12306 2015年3月验证码识别
查看>>
从Handler.post(Runnable r)再一次梳理Android的消息机制(以及handler的内存泄露)
查看>>
windows查看端口占用
查看>>
Yii用ajax实现无刷新检索更新CListView数据
查看>>
JDBC的事务
查看>>
Io流的概述
查看>>
App 卸载记录
查看>>
JavaScript变量和作用域
查看>>
开源SIP服务器加密软件NethidPro升级
查看>>
《别做正常的傻瓜》的一些读书心得
查看>>
作业:实现简单的shell sed替换功能和修改haproxy配置文件
查看>>