116 views

Flume+Kafka+Storm+Redis搭建互联网大数据实时处理系统:实时统计网站地址PV、UV+展现

By | 2019年2月22日

1 大数据处理的常见方式方法:
前边得出的那一篇文章是应用场景MapReduce的离线大数据分析案例,其根据对网站造成的客户浏览日记进行解决并分析出该网站在某天的PV、UV等数据,对应上面的图示,其走的就是离线解决的数据处理方法形式,而这儿即将要介紹的是另外一条线路的数据处理方法形式,即基于Storm的在线解决,在下边得出的完整案例中,人们将会完成下边的几类工作:

1.怎样一步一步构建我们的实时处理系统(Flume+Kafka+Storm+Redis)
2.实时处理网站的客户浏览日志,并统计分析出该网站的PV、UV
3.将即时分析出的PV、UV动态地展现在我们的前面网页页面上
当你对上边谈及的互联网大数据部件已经有所了解,或者对怎样构建大数据实时处理系统很感兴趣,那麼就可以畅快阅读下面的內容了。

必须注意的是,核心内容取决于怎样搭建实时处理系统,而这儿得出的案例是即时统计某一网站的PV、UV,在实际中,基于每个人的环境不同,业务不同,因此信息化系统的时间复杂度也各有不同,相对而言,这儿统计PV、UV的业务是非常简单的,但也足够让我们互联网大数据实时处理系统有一个基本的、清晰的了解与认识,是的,它不再那麼神密了。
2 实时处理系统架构图

我们的实时处理系统总体构架下表中:

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

即从上面的构架中我们可以看得出,其由下面的几一部分组成:

Flume集群
Kafka集群
Storm集群
从构建实时处理系统的角度出发,我们必须做的是,怎么让数据在每个不同的集群系统之间打通(从上边的图例中也能非常好地表明这一点),即需要做系统结构之前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合。这样的话,各个环境是不是应用群集,依个人的实际需要而定,在我们的环境中,Flume、Kafka、Storm都应用群集。

3 Flume+Kafka融合

Flume+Kafka+Storm+Redis构建互联网大数据实时处理系统:实时统计网站PV、UV+展示

3.1 整合构思

对于Flume来讲,关键所在怎样收集数据,并且将其发送至Kafka上,而且因为我们这儿了使用Flume集群的方式,Flume集群的配备也是非常重要的。而对于Kafka,重要就是怎样接受来源于Flume的数据。从总体上讲,逻辑应该是非常简单的,即可以在Kafka中建立一个用以我们实时处理系统的topic,然后Flume将其采集到的数据发送至该topic上即可。

3.2 融合过程:Flume群集配置与Kafka Topic建立

3.2.1 Flume群集配置

Flume+Kafka+Storm+Redis构建互联网大数据实时处理系统:实时统计网站PV、UV+展示

在我们的场景中,两个Flume Agent各自部署在两台Web网络服务器上,用于采集Web网络服务器上的日志数据,然后其数据的下移方式都为发送至另外一个Flume Agent上,因此这儿我们需要配置三个Flume Agent.

3.2.1.1 Flume Agent01

该Flume Agent部署在一台Web网络服务器上,用于收集产生的Web日记,然后发送至Flume Consolidation Agent上,创建一个新的配置文件flume-sink-avro.conf,其配置内容下表中:

#########################################################
##
##关键功效是监听文件中的新增统计数据,采集到统计数据之后,输出到avro
##    注意:Flume agent的运行,主要也是配置source channel sink
##  下边的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置叙述 监听文档中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log

#对于sink的配置叙述 使用avro日志做数据的消費
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = uplooking03
a1.sinks.k1.port = 44444

#对于channel的配置叙述 使用文档做数据的临时缓存文件 这种的安全系数要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置成功后, 启动Flume Agent,只能对日志文档进行监听:

$ flume-ng agent –conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1\#&
3.2.1.2 Flume Agent02

该Flume Agent部署在一台Web网络服务器上,用于采集造成的Web日志,然后发送至Flume Consolidation Agent上,创建一个新的配置文件flume-sink-avro.conf,其配置内容如下:

#########################################################
##
##主要作用是监听文档中的新增数据,采集到数据之后,输出到avro
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下边的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配备描述 监听文档中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log

#对于sink的配置叙述 使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = uplooking03
a1.sinks.k1.port = 44444

#对于channel的配置叙述 使用文件做数据的临时缓存文件 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配备完成后, 启动Flume Agent,即可对日志文档进行监听:

$ flume-ng agent –conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1\#&
3.2.1.3 Flume Consolidation Agent

Flume+Kafka+Storm+Redis构建互联网大数据实时处理系统:实时统计网站PV、UV+展示

该Flume Agent用以接收其他两个Agent发送到过来的数据,然后将其发送至Kafka上,创建一个新的配置文件flume-source_avro-sink_kafka.conf,配置內容下表:

#########################################################
##
##主要作用是监听文件目录中的新增文档,采集到数据以后,输出到kafka
##    注意:Flume agent的运行,关键就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置叙述 监听avro
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

#对于sink的配置叙述 使用kafka做数据的消费
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = f-k-s
a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

#对于channel的配置描述 使用内存缓存地区做数据的临时缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过channel c1将source r1和sink k1关系起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置成功后, 启动Flume Agent,即可对avro的数据进行监听:

$ flume-ng agent –conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/null 2>&1\#&
3.2.2 Kafka配置

在我们的Kafka中,先建立一个topic,用于后边接收Flume采集回来的数据:

kafka-topics.sh –create –topic f-k-s  –zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 –partitions 3 –replication-factor 3
3.3 整合认证

起动Kafka的消费脚本:

$ kafka-console-consumer.sh –topic f-k-s –zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
假如在Web网络服务器上有增加的日志数据,就会被我们的Flume程序监听到,并且最终会传输到到Kafka的f-k-stopic中,这儿做为认证,我们上边启动的是一个kafka终端消费的脚本,这时候会在终端中见到数据的输出:

$ kafka-console-consumer.sh –topic f-k-s –zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
1003    221.8.9.6 80    0f57c8f5-13e2-428d-ab39-9e87f6e85417    10709   0       GET /index HTTP/1.1     null    null      Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1  1523107496164
1002    220.194.55.244  fb953d87-d166-4cb4-8a64-de7ddde9054c    10201   0       GET /check/detail HTTP/1.1      null      null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107497165
1003    211.167.248.22  9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48    10022   1       GET /user/add HTTP/1.1  null    null      Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0)       1523107496664
1002    61.172.249.96   null    10608   0       POST /updateById?id=21 HTTP/1.1 null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko      1523107498166
1000    202.98.11.101   aa7f62b3-a6a1-44ef-81f5-5e71b5c61368    20202   0       GET /getDataById HTTP/1.0       404       /check/init     Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
更何况,我们的融合就没有问题,当然kafka中的数据应当是由我们的storm来进行消費的,这儿只是做为融合的一个测试,下边就会来做kafka+storm的融合。

4 Kafka+Storm融合

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

Kafka和Storm的融合其实在Storm的官网上也是十分详尽清晰的文本文档:http://storm.apache.org/releases/1.0.6/storm-kafka.html,想对其有更多知道的同学可以参照一下。

4.1 融合思路

在这次的大数据实时处理系统的构建中,Kafka等于是作为消息队列(或者说是消息中间件)的角色,其产生的消息需要有顾客去消費,所以Kafka与Storm的整合,关键所在我们的Storm如何去消费Kafka消息topic中的消息(kafka消息topic中的消息正是由Flume采集而成,现在我们必须在Storm中对其进行消費)。

在Storm中,topology是非常重要的定义。

对比MapReduce,在MapReduce中,我们提交申请的作业称之为一个job,在一个Job中,又包含若干个Mapper和Reducer,正是在Mapper和Reducer中有对于数据的解决逻辑:

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

在Storm中,我们提交的一个作业称之为topology,其又包含了spout和bolt,在Storm中,对数据的处理逻辑正是在spout和bolt中反映:

Flume+Kafka+Storm+Redis搭建大数据实时处理系统:实时统计网站PV、UV+展示

即在spout中,更是我们数据的来源于,又因为其实时的特性,因此可以把它比成一个“自来水龙头”,表示其绵绵不绝地产生数据:

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

因此,问题的关键是spout怎样去获得来源于kafka的数据?

好在,storm-kafka的整合库中提供了这样的API来供我们进行操作。

4.2 融合过程:KafkaSpout的应用

在源代码的逻辑中只需要建立一个由storm-kafkaAPI提供的KafkaSpout对象即可:

SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
return new KafkaSpout(spoutConf);
下边给出完整的融合源代码:

package cn.xpleaf.bigdata.storm.statics;

import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

/**
* Kafka和storm的融合,用于统计实时流量相匹配的pv和uv
*/
public class KafkaStormTopology {

//    static class MyKafkaBolt extends BaseRichBolt {
static class MyKafkaBolt extends BaseBasicBolt {

/**
* kafkaSpout发送的字段名为bytes
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
byte[] binary = input.getBinary(0); // 跨jvm传送数据,接收到的是字节数据
//            byte[] bytes = input.getBinaryByField(“bytes”);   // 这种方法也行
String line = new String(binary);
System.out.println(line);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
/**
* 设置spout和bolt的dag(有向无环图)
*/
KafkaSpout kafkaSpout = createKafkaSpout();
builder.setSpout(“id_kafka_spout”, kafkaSpout);
builder.setBolt(“id_kafka_bolt”, new MyKafkaBolt())
.shuffleGrouping(“id_kafka_spout”); // 根据不同的数据流转方式,来指定数据的上游组件
// 使用builder构建topology
StormTopology topology = builder.createTopology();
String topologyName = KafkaStormTopology.class.getSimpleName();  // 拓扑的名称
Config config = new Config();   // Config()对象承继自HashMap,但本身封裝了一些基本的配备

// 启动topology,当地启动使用LocalCluster,群集启动使用StormSubmitter
if (args == null || args.length < 1) {  // 没有主要参数时使用当地模式,有参数时使用群集模式
LocalCluster localCluster = new LocalCluster(); // 本地开发方式,创建的对象为LocalCluster
localCluster.submitTopology(topologyName, config, topology);
} else {
StormSubmitter.submitTopology(topologyName, config, topology);
}
}

/**
* BrokerHosts hosts  kafka群集列表
* String topic       要消费的topic主题
* String zkRoot      kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移)
* String id          当今操作的标志id
*/
private static KafkaSpout createKafkaSpout() {
String brokerZkStr = “uplooking01:2181,uplooking02:2181,uplooking03:2181”;
BrokerHosts hosts = new ZkHosts(brokerZkStr);   // 通过zookeeper中的/brokers即可找到kafka的地址
String topic = “f-k-s”;
String zkRoot = “/”\#+ topic;
String id = “consumer-id”;
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
// 本地环境设定之后,也可以在zk中创建/f-k-s节点,在群集环境中,不用配备也可以在zk中建立/f-k-s节点
//spoutConf.zkServers = Arrays.asList(new String[]{“uplooking01”, “uplooking02”, “uplooking03”});
//spoutConf.zkPort = 2181;
spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设定之后,刚启动就不会把之前的消费也进行读取,会从最新的偏移开始读取
return new KafkaSpout(spoutConf);
}
}
其实代码的逻辑非常简单,我们只创建了 一个由storm-kafka提供的KafkaSpout对象和一个包含我们处理逻辑的MyKafkaBolt对象,MyKafkaBolt的逻辑也很简单,就是把kafka的消息打印到控制台上。

需要注意的是,后面我们分析网站PV、UV的工作,正是在上面这部分简单的代码中完成的,所以其是非常重要的基础。
4.3 整合验证

上面的整合代码,可以在本地环境中运行,也可以将其打包成jar包上传到我们的Storm集群中并提交业务来运行。如果Web服务器能够产生日志,并且前面Flume+Kafka的整合也没有问题的话,将会有下面的效果。

如果是在本地环境中运行上面的代码,那么可以在控制台中看到日志数据的输出:

……
45016548 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator – Task [1/1] Refreshing partition manager connections
45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.DynamicBrokersReader – Read partition info from zookeeper: GlobalPartitionInformation{topic=f-k-s, partitionMap={0=uplooking02:9092, 1=uplooking03:9092, 2=uplooking01:9092}}
45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.KafkaUtils – Task [1/1] assigned [Partition{host=uplooking02:9092, topic=f-k-s, partition=0}, Partition{host=uplooking03:9092, topic=f-k-s, partition=1}, Partition{host=uplooking01:9092, topic=f-k-s, partition=2}]
45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator – Task [1/1] Deleted partition managers: []
45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator – Task [1/1] New partition managers: []
45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator – Task [1/1] Finished refreshing
1003    221.8.9.6 80    0f57c8f5-13e2-428d-ab39-9e87f6e85417    10709   0   GET /index HTTP/1.1 null    null    Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1  1523107496164
1000    202.98.11.101   aa7f62b3-a6a1-44ef-81f5-5e71b5c61368    20202   0   GET /getDataById HTTP/1.0   404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
1002    220.194.55.244  fb953d87-d166-4cb4-8a64-de7ddde9054c    10201   0   GET /check/detail HTTP/1.1  null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107497165
1003    211.167.248.22  9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48    10022   1   GET /user/add HTTP/1.1  null    null    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0)   1523107496664
1002    61.172.249.96   null    10608   0   POST /updateById?id=21 HTTP/1.1 null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107498166
……
假如是在Storm群集中提交的作业运行,那麼还可以在Storm的日志中看到web网络服务器产生的日志数据:

Flume+Kafka+Storm+Redis构建大数据实时处理系统:即时统计网站PV、UV+展示

更何况就成功了Kafka+Storm的融合。

5 Storm+Redis融合

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

5.1 整合思路

我觉得所谓Storm和Redis的融合,指的是在我们的实时处理系统中的数据的落地式方式,即在Storm中包含了我们解决数据的逻辑,而数据处理方法完毕后,产生的数据处理方法结果该保存到哪些地方呢?显然就有很多种方式了,关系数据库、NoSQL、HDFS、HBase等,这应该在于具体的业务和数据量,这里,我们使用Redis来进行最终分析数据的储存。

所以事实上做那步的融合,其实就是开始写我们的业务解决代码了,由于通过前面Flume-Kafka-Storm的融合,已经打通了整个数据的流通路径,接下来关键要做的是,在Storm中,怎样解决我们的数据并储存到Redis中。

而在Storm中,spout已经不用我们来写了(由storm-kafka的API提供了KafkaSpout对象),所以问题就变成,怎样根据业务编写分析解决数据的bolt。

5.2 整合过程:编写Storm业务处理Bolt

5.2.1 日志分析

我们实时获得的日志格式下表:

1002    202.103.24.68   1976dc2e-f03a-44f0-892f-086d85105f7e    14549   1       GET /top HTTP/1.1       200     /tologin  Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806916373
1000    221.8.9.6 80    542ccf0a-9b14-49a0-93cd-891d87ddabf3    12472   1       GET /index HTTP/1.1     500     /top      Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT)   1523806916874
1003    211.167.248.22  0e4c1875-116c-400e-a4f8-47a46ad04a42    12536   0       GET /tologin HTTP/1.1   200     /stat     Mozilla/5.0 (Windows; U; Windows NT 5.2) AppleWebKit/525.13 (KHTML,like Gecko) Chrome/0.2.149.27 Safari/525.13    1523806917375
1000    219.147.198.230 07eebc1a-740b-4dac-b53f-bb242a45c901    11847   1       GET /userList HTTP/1.1  200     /top      Mozilla/4.0 (compatible; MSIE 6.0; Windows NT5.1)       1523806917876
1001    222.172.200.68  4fb35ced-5b30-483b-9874-1d5917286675    13550   1       GET /getDataById HTTP/1.0       504       /tologin        Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13   1523806918377
其中必须表明的是下一个字段和第四个字段,由于它对我们统计pv和uv非常有帮助,他们各自是ip字段和mid字段,表明下表:

ip:客户的ip地址
mid:唯一的id,此id第一次会种在网页的cookie里。假如存在则不再种。做为电脑浏览器唯一标示。手机端或者pad直接取机器码。
因而,根据ip地址,我们可以根据查寻获得其所在的省份,而且建立一个属于该省区的变量,用以记录pv数,每来一条属于该省区的日志记录,则该省区的pv就加1,以此来成功pv的统计。

而针对mId,我们则可以建立归属于该省的一个set集合,每来一条归属于该省区的日志记录,则可以将该mid添加到set集合中,因为set集合存放的不是重复的数据,这样就可以帮我们全自动过滤掉重复的mid,根据set集合的大小,就可以统计出uv。

在我们storm的业务解决源代码中,我们必须编写两个bolt:

第一个bolt用来对数据进行数据预处理,也就是获取我们必须的ip和mid,而且根据IP查询得到省区信息;
第二个bolt用来统计pv、uv,并定时将pv、uv数据写入到Redis中;
这样的话上边只是说明了整体的思路,事实上还有许多必须注意的细节问题和技巧问题,这都会我们的代码中进行体现,我在后边写的源代码中都加了非常详尽的注解进行说明。

5.2.2 编写第一个Bolt:ConvertIPBolt

依据上面的分析,编辑用于数据预处理的bolt,源代码下表:

package cn.xpleaf.bigdata.storm.statistic;

import cn.xpleaf.bigdata.storm.utils.JedisUtil;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;

/**
* 日志数据预处理Bolt,实现功能:
*     1.提取实现业务需求所需要的信息:ip地址、客户端唯一一个标识mid
*     2.查看IP地址归属地,并发送到下一个Bolt
*/
public class ConvertIPBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
byte[] binary = input.getBinary(0);
String line = new String(binary);
String[] fields = line.split(“\t”);

if(fields == null || fields.length < 10) {
return;
}

// 获取ip和mid
String ip = fields[1];
String mid = fields[2];

// 根据ip获取其归属地(省份)
String province = null;
if (ip != null) {
Jedis jedis = JedisUtil.getJedis();
province = jedis.hget(“ip_info_en”, ip);
// 需要释放jedis的资源,否则会报can not get resource from the pool
JedisUtil.returnJedis(jedis);
}

// 发送数据到下一个bolt,只发送实现业务功能需要的province和mid
collector.emit(new Values(province, mid));

}

/**
* 定义了发送到下一个bolt的数据包含两个域:province和mid
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“province”, “mid”));
}
}
5.2.3 编写第二个Bolt:StatisticBolt

这个bolt包含我们统计网站pv、uv的代码逻辑,因此非常重要,其代码如下:

package cn.xpleaf.bigdata.storm.statistic;

import cn.xpleaf.bigdata.storm.utils.JedisUtil;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;

import java.text.SimpleDateFormat;
import java.util.*;

/**
* 日志数据统计Bolt,实现功能:
* 1.统计各省份的PV、UV
* 2.以天为单位,将省份对应的PV、UV信息写入Redis
*/
public class StatisticBolt extends BaseBasicBolt {

Map<String, Integer> pvMap = new HashMap<>();
Map<String, HashSet<String>> midsMap = null;
SimpleDateFormat sdf = new SimpleDateFormat(“yyyyMMdd”);

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) {  // 如果收到非系统级别的tuple,统计信息到局部变量mids
String province = input.getStringByField(“province”);
String mid = input.getStringByField(“mid”);
pvMap.put(province, pvMap.get(province) + 1);   // pv+1
if(mid != null) {
midsMap.get(province).add(mid); // 将mid添加到该省份所对应的set中
}
} else {    // 如果收到系统级别的tuple,则将数据更新到Redis中,释放JVM堆内存空间
/*
* 以 广东 为例,其在Redis中保存的数据格式如下:
* guangdong_pv(Redis数据结构为hash)
*         –20180415
*              –pv数
*         –20180416
*              –pv数
* guangdong_mids_20180415(Redis数据结构为set)
*         –mid
*         –mid
*         –mid
*         ……
* guangdong_mids_20180415(Redis数据结构为set)
*         –mid
*         –mid
*         –mid
*         ……
*/
Jedis jedis = JedisUtil.getJedis();
String dateStr = sdf.format(new Date());
// 更新pvMap数据到Redis中
String pvKey = null;
for(String province : pvMap.keySet()) {
int currentPv = pvMap.get(province);
if(currentPv > 0) { // 当前map中的pv大于0才更新,否则没有意义
pvKey = province + “_pv”;
String oldPvStr = jedis.hget(pvKey, dateStr);
if(oldPvStr == null) {
oldPvStr = “0”;
}
Long oldPv = Long.valueOf(oldPvStr);
jedis.hset(pvKey, dateStr, oldPv + currentPv + “”);
pvMap.replace(province, 0); // 将该省的pv重新设置为0
}
}
// 更新midsMap到Redis中
String midsKey = null;
HashSet<String> midsSet = null;
for(String province: midsMap.keySet()) {
midsSet = midsMap.get(province);
if(midsSet.size() > 0) {  // 当前省份的set的大小大于0才更新到,否则没有意义
midsKey = province + “_mids_” + dateStr;
jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()]));
midsSet.clear();
}
}
// 释放jedis资源
JedisUtil.returnJedis(jedis);
System.out.println(System.currentTimeMillis() + “——->写入数据到Redis”);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

/**
* 设置定时任务,只对当前bolt有效,系统会定时向StatisticBolt发送一个系统级别的tuple
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return config;
}

/**
* 初始化各个省份的pv和mids信息(用来临时存储统计pv和uv需要的数据)
*/
public StatisticBolt() {
pvMap = new HashMap<>();
midsMap = new HashMap<String, HashSet<String>>();
String[] provinceArray = {“shanxi”, “jilin”, “hunan”, “hainan”, “xinjiang”, “hubei”, “zhejiang”, “tianjin”, “shanghai”,
“anhui”, “guizhou”, “fujian”, “jiangsu”, “heilongjiang”, “aomen”, “beijing”, “shaanxi”, “chongqing”,
“jiangxi”, “guangxi”, “gansu”, “guangdong”, “yunnan”, “sicuan”, “qinghai”, “xianggang”, “taiwan”,
“neimenggu”, “henan”, “shandong”, “shanghai”, “hebei”, “liaoning”, “xizang”};
for(String province : provinceArray) {
pvMap.put(province, 0);
midsMap.put(province, new HashSet());
}
}
}
5.2.4 编写Topology

我们需要编写一个topology用来组织前面编写的Bolt,代码如下:

package cn.xpleaf.bigdata.storm.statistic;

import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;

/**
* 构建topology
*/
public class StatisticTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
/**
* 设置spout和bolt的dag(有向无环图)
*/
KafkaSpout kafkaSpout = createKafkaSpout();
builder.setSpout(“id_kafka_spout”, kafkaSpout);
builder.setBolt(“id_convertIp_bolt”, new ConvertIPBolt()).shuffleGrouping(“id_kafka_spout”); // 通过不同的数据流转方式,来指定数据的上游组件
builder.setBolt(“id_statistic_bolt”, new StatisticBolt()).shuffleGrouping(“id_convertIp_bolt”); // 通过不同的数据流转方式,来指定数据的上游组件
// 使用builder构建topology
StormTopology topology = builder.createTopology();
String topologyName = KafkaStormTopology.class.getSimpleName();  // 拓扑的名称
Config config = new Config();   // Config()对象继承自HashMap,但本身封装了一些基本的配置

// 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
if (args == null || args.length < 1) {  // 没有参数时使用本地模式,有参数时使用集群模式
LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
localCluster.submitTopology(topologyName, config, topology);
} else {
StormSubmitter.submitTopology(topologyName, config, topology);
}
}

/**
* BrokerHosts hosts  kafka集群列表
* String topic       要消费的topic主题
* String zkRoot      kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量)
* String id          当前操作的标识id
*/
private static KafkaSpout createKafkaSpout() {
String brokerZkStr = “uplooking01:2181,uplooking02:2181,uplooking03:2181”;
BrokerHosts hosts = new ZkHosts(brokerZkStr);   // 通过zookeeper中的/brokers即可找到kafka的地址
String topic = “f-k-s”;
String zkRoot = “/” + topic;
String id = “consumer-id”;
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
// 本地环境设置之后,也可以在zk中建立/f-k-s节点,在集群环境中,不用配置也可以在zk中建立/f-k-s节点
//spoutConf.zkServers = Arrays.asList(new String[]{“uplooking01”, “uplooking02”, “uplooking03”});
//spoutConf.zkPort = 2181;
spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置以后,刚启动就不会把之前的消费进行读取,会从最新的偏移量开始读取
return new KafkaSpout(spoutConf);
}
}
写此文,一来是对自己实践中的一些总结,二来也是希望把一些比较不错的项目案例分享给大家,总之希望能够对大家有所帮助。
本文转载于:http://win-man.com/faq
本文关键词:云漫源代码咨询 服务器租用 高防CDN
作者:服务器安全技术员

发表评论

电子邮件地址不会被公开。 必填项已用*标注