查看原文
其他

Spark之离线统计热点城市信息

fjs 大数据学习与分享 2022-07-09
一、需求分析
通过网站或者电信运营商采集的日志信息,以及全国各个城市IP段信息,来判断用户的IP段,统计用户集中的地点信息。通过分析得到的数据,进行业务部门进行热点城市图绘制的数据支持。

二、数据准备
1. 城市IP段信息
// IP起始段 | IP结束段 | IP开始数字 | IP结束数字| | 经纬度信息1.86.64.0|1.86.95.255|22429696|22437887|亚洲|中国|陕西|延安||电信|610600|China|CN|109.49081|36.5965371.86.96.0|1.86.111.255|22437888|22441983|亚洲|中国|陕西|铜川||电信|610200|China|CN|108.963122|34.908921.86.112.0|1.86.127.255|22441984|22446079|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.3334391.86.192.0|1.86.223.255|22462464|22470655|亚洲|中国|陕西|安康||电信|610900|China|CN|109.029273|32.69031.87.152.0|1.87.175.255|22517760|22523903|亚洲|中国|陕西|西安||电信|610100|China|CN|108.948024|34.2631611.87.176.0|1.87.191.255|22523904|22527999|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.3334391.88.0.0|1.91.255.255|22544384|22806527|亚洲|中国|北京|北京||歌华有线|110100|China|CN|116.405285|39.9049891.92.0.0|1.93.255.255|22806528|22937599|亚洲|中国|北京|北京||鹏博士|110100|China|CN|116.405285|39.904989······
2. 采集的IP相关日志信息
// 我们主要关心IP信息,如115.120.36.118,对应的通常是基站数据20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,3020090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|20090121000133446262000|115.120.12.157|v.ifeng.com|/live/|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)|http://www.ifeng.com/|userid=1232466610953_4339; location=186; sclocationid=10002; vjuids=22644b162.11ef4bc1624.0.63ad06717b426; vjlast=1232466614,1232467297,1320090121000133456256000|115.120.7.240|cqbbs.soufun.com|/3110502342~-1~2118/23004348_23004348.htm|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)||new_historysignlist=%u534E%u6DA6%u4E8C%u5341%u56DB%u57CE%7Chttp%3A//cqbbs.soufun.com/board/3110502342/%7C%7C%u9A8F%u9038%u7B2C%u4E00%u6C5F%u5CB8%7Chttp%3A//cqbbs.soufun.com/board/3110169184/%7C%7C%u793E%u533A%u4E4B%u661F%7Chttp%3A//cqbbs.soufun.com/board/sqzx/%7C%7C; SoufunSessionID=2y5xyr45kslc0zbdooqnoo55; viewUser=1; vjuids=-870e9088.11ee89aba57.0.be9c3d988def8; vjlast=1232263101,1232380806,11; new_viewtype=1; articlecolor=#000000; usersms_pop_type=1; articlecount=186; __utma=101868291.755195653.1232450942.1232450942.1232450942.1; __utmz=101868291.1232450942.1.1.utmccn=(referral)······

三、代码实现

1.主程序

/** * @Author bigdatalearnshare */object IpLocation {
def main(args: Array[String]) {
val sparkSession = SparkSession .builder() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .appName("test") .master("local[*]") .getOrCreate()
val ipInfos = sparkSession.read.textFile("/bigdatalearnshare/location/ip.txt").rdd .map(_.split("\\|")) .map(info => (info(2), info(3), s"${info(6)}-${info(7)}-${info(9)}-${info(13)}-${info(14)}")) .collect()
val ipBroadcast = sparkSession.sparkContext.broadcast(ipInfos)
    // format-http存储采集的IP日志信息    val locationIp = sparkSession.read.textFile("/bigdatalearnshare/location/format-http").rdd .map(_.split("\\|")) .map(_ (1)) .mapPartitions { iter => val ipInfoVal = ipBroadcast.value iter.map { ip => val ipNum = ip2Num(ip) val index = binarySearch(ipInfoVal, ipNum)
(ipInfoVal(index)._3, ip) }      }
locationIp.map(x => (x._1, 1)).reduceByKey(_ + _) .foreachPartition(saveData2MySQL(_))
sparkSession.stop() }}

2.首先我们要将IP地址转换为long型数字,目的是方便确定用户日志信息中的IP所处的IP段

/** 将ip转换成Long数字. 如1.86.64.0 => 22429696 */def ip2Num(ip: String): Long = { var ipNum = 0L
ip.split("[.]") .foreach { i => ipNum = i.toLong | ipNum << 8L      }       ipNum}
3.日志的IP对应的IP段广播信息进行比较,采用二分查找法确定
/** ip段中, ip是有序的, 所以采用二分查找法 */ def binarySearch(ipInfos: Array[(String, String, String)], ip: Long): Int = { var low = 0 var high = ipInfos.length - 1
while (low <= high) { val middle = (low + high) / 2
if ((ip >= ipInfos(middle)._1.toLong) && (ip <= ipInfos(middle)._2.toLong)) return middle
if (ip < ipInfos(middle)._1.toLong) high = middle - 1 else low = middle + 1 }
-1 }
4.将统计结果落库到mysql
def saveData2MySQL(iterator: Iterator[(String, Int)], batchSize: Int = 1000): Unit = { var conn: Connection = null var pst: PreparedStatement = null val sql = "INSERT INTO location_info (location, ip_count) VALUES (?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdatalearnshare", "root", "root")
// 生产中为了安全起见, 应该首先通过Connection的API获取metadata, 判断事务的支持情况, 失败情况下进行回滚. 这里为了方便, 不在做相应判断 conn.setAutoCommit(false) pst = conn.prepareStatement(sql)
var rowCount = 0 iterator.foreach { case (location, count) => pst.setString(1, location) pst.setInt(2, count)
pst.addBatch() rowCount += 1
if (rowCount % batchSize == 0) { pst.executeBatch() conn.commit() rowCount = 0 } }
if (rowCount > 0) { pst.executeBatch() conn.commit() }
} catch { case e: Exception => println(e) } finally { if (pst != null) pst.close() if (conn != null) conn.close() } }


四、统计结果示例

(河北-石家庄-卫视创捷-114.502461-38.045474,383)(云南-昆明-鹏博士-102.712251-25.040609,126)(重庆-重庆-铁通-106.56347-29.52311,3)(重庆-重庆-铁通-107.39007-29.70292,47)(重庆-重庆-铁通-106.51107-29.50197,91)(重庆-重庆-铁通-106.504962-29.533155,400)(北京-北京-鹏博士-116.405285-39.904989,1535)(重庆-重庆-铁通-106.57434-29.60658,177)......

推荐文章:
流式应用程序中checkpoint的最新offset获取
Scala中的IO操作及ArrayBuffer线程安全问题
内存泄露、内存溢出和堆外内存,JVM优化
Hive数据导入HBase引起数据膨胀引发的思考
HBase中Memstore存在意义以及多列族问题和设计
Kafka中sequence IO、PageCache、SendFile应用
笔试编程 | 二分查找、数组、排序
对Spark硬件配置的建议

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存