flume
多种适配,多样化的数据收集 核心概念 event:一条消息 client:访问者 agent: 重要组件Sources、Channels、Sinks。Interspactor、Selecter kafka 吞吐量大,高并发场景下使用注意:flume的agent配置文件不允许有空格。
一、flume打印内容到控制台 1、创建一个agent(使用avroSource接收网络流在flume的控制台打印)配置文件agent1.conf cd /usr/local/flume/ vi /conf/agent1.conf agent1.sources=as1 agent1.channels=c1 agent1.sinks=s1 agent1.sources.as1.type=avro agent1.sources.as1.bind=0.0.0.0 ##接收任意ip发送的数据 agent1.sources.as1.port=21111 ##在21111端口上监听 agent1.sources.as1.channels=c1 agent1.channels.c1.type=memory agent1.sinks.s1.type=logger agent1.sinks.s1.channel=c1 2、启动agent1(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将输出打印在控制台上 bin/flume-ng agent --conf conf/ -Dflume.root.logger=DEBUG,console -n agent1 -f conf/agent1.conf 3、使用java代码生产log4j日志输出到flume 3、验证agent,一种是flume控制台测试,一种是java代码通过log4j写日志 1)bin/flume-ng avro-client --conf conf/ -H localhost -p 21111 -F ~/a ##将~目录下的a文件内容写入到flume 2)使用java类将log4j的日志写入到flume的agent中 log4j.properties配置文件 log4j.rootLogger=INFO,flume log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 192.168.1.33 ##flume启动agent所在的节点ip log4j.appender.flume.Port = 21111 ##flume启动agent监听的端口号 log4j.appender.flume.UnsafeMode = true java代码 public class FlumeProducer { public static void main(String[] args) throws Exception { final Logger logger = Logger.getLogger(FlumeProducer.class); while (true) { logger.info("logger datetime :" + System.currentTimeMillis()); Thread.sleep(1000); } } } 二、flume生成avroLog文件写入到hdfs中,存放到不同的/IP/日期/文件夹中 1、创建一个agent(使用avroSource接收网络流写入到hdfs)配置文件agent2.conf cd /usr/local/flume/ vi /conf/agent2.conf agent2.sources=source1 agent2.channels=channel1 agent2.sinks=sink1 agent2.sources.source1.type=avro agent2.sources.source1.bind=0.0.0.0 agent2.sources.source1.port=44444 agent2.sources.source1.channels=channel1 agent2.sources.source1.interceptors = i1 i2 agent2.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder agent2.sources.source1.interceptors.i1.preserveExisting = true agent2.sources.source1.interceptors.i1.useIP = true agent2.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder agent2.channels.channel1.type=memory agent2.channels.channel1.capacity=10000 agent2.channels.channel1.transactionCapacity=1000 agent2.channels.channel1.keep-alive=30 agent2.sinks.sink1.type=hdfs agent2.sinks.sink1.channel=channel1 agent2.sinks.sink1.hdfs.path=hdfs://ns1/flume/events/%{host}/%Y-%m-%d ##flume将文件写入到hdfs的路径 agent2.sinks.sink1.hdfs.filePrefix=avroLog- ##flume生成文件的前缀 agent2.sinks.sink1.hdfs.fileSuffix=.log ##flume生成文件的后缀 agent2.sinks.sink1.hdfs.fileType=DataStream ##flume生成文件的类型,DataStream或SequenceFile agent2.sinks.sink1.hdfs.writeFormat=Text agent2.sinks.sink1.hdfs.rollInterval=0 agent2.sinks.sink1.hdfs.rollSize=10000 agent2.sinks.sink1.hdfs.rollCount=0 agent2.sinks.sink1.hdfs.idleTimeout=5 2、启动agent2(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将内容写入到hdfs的/flume/events/中 bin/flume-ng agent --conf conf/ -Dflume.monitoring.type=http -Dflume.monitoring.port=34343 -n agent2 -f conf/agent2.conf 3、使用java代码生产log4j日志输出到flume log4j.properties配置文件 log4j.rootLogger=INFO,flume log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 192.168.1.33 ##flume启动agent所在的节点ip log4j.appender.flume.Port = 21111 ##flume启动agent监听的端口号 log4j.appender.flume.UnsafeMode = true java代码 public class FlumeProducer { public static void main(String[] args) throws Exception { final Logger logger = Logger.getLogger(FlumeProducer.class); while (true) { logger.info("logger datetime :" + System.currentTimeMillis()); Thread.sleep(1000); } } } 4、验证agent2是否成功写入到hdfs的/flume/events/文件夹下 hdfs dfs -ls -h -R /flume/events/IP/yyyy-MM-dd/ ##如果存在一个或多个avroLog.timestamp.log文件表示成功 三、使用Socket客户端写入到flume中,flume保存文件到本地
1、创建agent_tcp.conf(接收socket客户端发送的数据然后写入到Linux本地)
cd /usr/local/flume
vi conf/agent_tcp.conf
agent_tcp.sources=as1
agent_tcp.channels=c1 agent_tcp.sinks=s1 agent_tcp.sources.as1.type=syslogtcp agent_tcp.sources.as1.bind=0.0.0.0 agent_tcp.sources.as1.port=21111 agent_tcp.sources.as1.channels=c1 agent_tcp.channels.c1.type=memory agent_tcp.channels.c1.capacity=10000 agent_tcp.channels.c1.transactionCapacity=10000 agent_tcp.channels.c1.keep-alive=120 agent_tcp.channels.c1.byteCapacityBufferPercentage=20 agent_tcp.channels.c1.byteCapacity=800000 agent_tcp.sinks.s1.type=file_roll agent_tcp.sinks.s1.rollSize=10000 agent_tcp.sinks.s1.sink.directory =/home/lefuBigDataDev/clouds/flume/logs agent_tcp.sinks.s1.channel=c1 2、启动flume的agent_tcp.confbin/flume-ng agent -n agent_tcp -c conf/ -f conf/agent_tcp.conf -Dflume.root.logger=DEBUG,console
3、java代码socket客户端
package com.left.clouds.cluster.flume.test;
import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import org.junit.Before; import org.junit.Test; public class TestFlume { private Socket client = null; InputStream in = null; OutputStream out = null; @Before public void before(){ try { client = new Socket("192.168.0.218", 21111); } catch (Exception e) { e.printStackTrace(); } } @Test public void sender() { try { out = client.getOutputStream(); int i = 0; while(true){ out.write(("device-"+(i++)+("\n")).getBytes()); Thread.sleep(4000); System.out.println("第:"+i+"次发送..."); } } catch (Exception e) { e.printStackTrace(); } } }
Flume-1.6.0中包含了kafka的source,agent配置文件实例如下
front_agent_kafka.sources=as1front_agent_kafka.channels=c1front_agent_kafka.sinks=s1front_agent_kafka.sources.as1.type=org.apache.flume.source.kafka.KafkaSourcefront_agent_kafka.sources.as1.zookeeperConnect=192.168.0.20:2181front_agent_kafka.sources.as1.topic=testfront_agent_kafka.sources.as1.groupId=flumefront_agent_kafka.sources.as1.batchSize=100front_agent_kafka.sources.as1.channels=c1 front_agent_kafka.channels.c1.type=memoryfront_agent_kafka.channels.c1.capacity=10000front_agent_kafka.channels.c1.transactionCapacity=10000front_agent_kafka.channels.c1.keep-alive=120front_agent_kafka.channels.c1.byteCapacityBufferPercentage=20front_agent_kafka.channels.c1.byteCapacity=800000front_agent_kafka.sinks.s1.type=com.lefukj.flume.sinks.JdbcSinkfront_agent_kafka.sinks.s1.channel=c1