agentA (10.1.124.197)agentB(10.1.124.196)agent(10.1.124.198)
测试一
agentA = > agentB 从客户端推送数据到 agentB ,更改channel 为jdbc 保证events 数据可靠性。
## weblog agent config agent A 配置 #List sources, sinks and channels in the agent |
---|
agnetB 配置:
# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # Define a logger sink that simply logs all events it receives # Finally, now that we’ve defined all of our components, tell |
---|
agent A 上执行:flume-ng agent -n agent1 -c /opt/apps/flume-ng/conf/ -f flume.conf &
测试二
根据测试一继续测试events 可靠性,stop掉agentB上flume agent 服务。
在agent A上测试写入数据。 start agentB上flume agent服务。agentA上的events 同步到agentB上。
测试三
agentA 使用exec 执行tail 作为 sources ,使用agentB 和agentC作为收集数据点。
|—————agentB
agentA
|—————agentC
测试内容分别将agentA的souces使用exec 执行tail 命令 读取nginx 日志,分别通过两个sinks 转发event 到agentB,agentC,测试负载均衡和可靠性
agentB、agentC配置文件同测试一的AgentB。agentA配置文件如下:
#List sources, sinks and channels in the agent weblog-agent.sources = tail weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02 weblog-agent.channels = jdbc-channel #define the flow #webblog-agent sources config weblog-agent.sources.tail.channels = jdbc-channel weblog-agent.sources.tail.type = exec weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log #avro sink properties weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel weblog-agent.sinks.avro-forward-sink01.type = avro weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196 weblog-agent.sinks.avro-forward-sink01.port = 41414 #avro sink 02 weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel weblog-agent.sinks.avro-forward-sink02.type = avro weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198 weblog-agent.sinks.avro-forward-sink02.port = 41414 #channels config #weblog-agent.channels.jdbc-channel.type = memory weblog-agent.channels.jdbc-channel.type = jdbc weblog-agent.channels.jdbc-channel.sysprop.user.home = /tmp/flumedb |
---|
agnetA上执行:flume-ng agent -n weblog-agent -c /opt/apps/flume-ng/conf/ -f flume.conf &
agnetB,C上执行:flume-ng agent -n agent1 -c /opt/apps/flume-ng/conf/ -f flume.conf &
测试结果: agentA上event会分别写到agentB和agentC,类似于lvs rr模式,stop agentC上的flume agent
后agentA上的event会写到agentB上,恢复agentC后,agentA上的event重新轮询写到agentB和C上。
测试四
agentB和agentC做agentA sink的HA方法,该配置使用Flume Sink Processors 。
可以观察下该agnetA配置文件同测试三的差别
#List sources, sinks and channels in the agent weblog-agent.sources = tail weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02 weblog-agent.channels = jdbc-channel #define the flow #webblog-agent sources config weblog-agent.sources.tail.channels = jdbc-channel weblog-agent.sources.tail.type = exec weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log #avro sink properties weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel weblog-agent.sinks.avro-forward-sink01.type = avro weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196 weblog-agent.sinks.avro-forward-sink01.port = 41414 #avro sink 02 weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel weblog-agent.sinks.avro-forward-sink02.type = avro weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198 weblog-agent.sinks.avro-forward-sink02.port = 41414 #sink FailoverSink weblog-agent.sinkgroups=group1 weblog-agent.sinkgroups.group1.sinks = avro-forward-sink01 avro-forward-sink02 weblog-agent.sinkgroups.group1.processor.type = failover weblog-agent.sinkgroups.group1.processor.priority.avro-forward-sink01 = 5 weblog-agent.sinkgroups.group1.processor.priority.avro-forward-sink02 = 10 #channels config #weblog-agent.channels.jdbc-channel.type = memory weblog-agent.channels.jdbc-channel.type = jdbc weblog-agent.channels.jdbc-channel.sysprop.user.home = /tmp/flumedb |
---|
定义一个sinkgroups 组。该组中成员只有一个进行sinks工作。priority 值越大,代表优先级越高。 priority值相同时,取最后出现的成员。
测试结果:avro-forward-sink02 提供sink工作。sotp掉agentC 上flume agent ,agentA上的event 使用avro-forward-sink01 发送到agentB上
恢复agentC上的flume agent ,相应event从新发送到agentC。
测试五
agentC—-|
| ====> agentB
agentA—–|
测试内容,汇聚,配置文件同
agentA & agentC
## weblog agent config agentA & agentC #List sources, sinks and channels in the agent |
---|
agentB
# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # Define a logger sink that simply logs all events it receives # Finally, now that we’ve defined all of our components, tell |
---|
测试结果,汇聚正常。
测试六
测试内容,sources select type replicating ,我称为日志raid1。将agentA 的event 写到agentB和C
agentA 配置文件如下:
#List sources, sinks and channels in the agent weblog-agent.sources = tail weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02 weblog-agent.channels = jdbc-channel01 jdbc-channel02 #define the flow #webblog-agent sources config weblog-agent.sources.tail.channels = jdbc-channel01 jdbc-channel02 weblog-agent.sources.tail.type = exec weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log weblog-agent.sources.tail.selector.type = replicating #avro sink properties weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel01 weblog-agent.sinks.avro-forward-sink01.type = avro weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196 weblog-agent.sinks.avro-forward-sink01.port = 41414 #avro sink 02 weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel02 weblog-agent.sinks.avro-forward-sink02.type = avro weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198 weblog-agent.sinks.avro-forward-sink02.port = 41414 #channels config #weblog-agent.channels.jdbc-channel.type = memory weblog-agent.channels.jdbc-channel01.type = jdbc weblog-agent.channels.jdbc-channel01.sysprop.user.home = /tmp/flumedb01 weblog-agent.channels.jdbc-channel02.type = jdbc weblog-agent.channels.jdbc-channel02.sysprop.user.home = /tmp/flumedb02 |
---|
测试结果,查看agentB和C 从agentA得到的event完全一下,注意时间,agentC的event收集时间较B要晚一些。