其他
Spark之离线统计热点城市信息
// IP起始段 | IP结束段 | IP开始数字 | IP结束数字| | 经纬度信息
1.86.64.0|1.86.95.255|22429696|22437887|亚洲|中国|陕西|延安||电信|610600|China|CN|109.49081|36.596537
1.86.96.0|1.86.111.255|22437888|22441983|亚洲|中国|陕西|铜川||电信|610200|China|CN|108.963122|34.90892
1.86.112.0|1.86.127.255|22441984|22446079|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.86.192.0|1.86.223.255|22462464|22470655|亚洲|中国|陕西|安康||电信|610900|China|CN|109.029273|32.6903
1.87.152.0|1.87.175.255|22517760|22523903|亚洲|中国|陕西|西安||电信|610100|China|CN|108.948024|34.263161
1.87.176.0|1.87.191.255|22523904|22527999|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.88.0.0|1.91.255.255|22544384|22806527|亚洲|中国|北京|北京||歌华有线|110100|China|CN|116.405285|39.904989
1.92.0.0|1.93.255.255|22806528|22937599|亚洲|中国|北京|北京||鹏博士|110100|China|CN|116.405285|39.904989
······
// 我们主要关心IP信息,如115.120.36.118,对应的通常是基站数据
20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,30
20090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html
20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|
20090121000133446262000|115.120.12.157|v.ifeng.com|/live/|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)|http://www.ifeng.com/|userid=1232466610953_4339; location=186; sclocationid=10002; vjuids=22644b162.11ef4bc1624.0.63ad06717b426; vjlast=1232466614,1232467297,13
20090121000133456256000|115.120.7.240|cqbbs.soufun.com|/3110502342~-1~2118/23004348_23004348.htm|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)||new_historysignlist=%u534E%u6DA6%u4E8C%u5341%u56DB%u57CE%7Chttp%3A//cqbbs.soufun.com/board/3110502342/%7C%7C%u9A8F%u9038%u7B2C%u4E00%u6C5F%u5CB8%7Chttp%3A//cqbbs.soufun.com/board/3110169184/%7C%7C%u793E%u533A%u4E4B%u661F%7Chttp%3A//cqbbs.soufun.com/board/sqzx/%7C%7C; SoufunSessionID=2y5xyr45kslc0zbdooqnoo55; viewUser=1; vjuids=-870e9088.11ee89aba57.0.be9c3d988def8; vjlast=1232263101,1232380806,11; new_viewtype=1; articlecolor=#000000; usersms_pop_type=1; articlecount=186; __utma=101868291.755195653.1232450942.1232450942.1232450942.1; __utmz=101868291.1232450942.1.1.utmccn=(referral)
······
三、代码实现
1.主程序
/**
* @Author bigdatalearnshare
*/
object IpLocation {
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.appName("test")
.master("local[*]")
.getOrCreate()
val ipInfos = sparkSession.read.textFile("/bigdatalearnshare/location/ip.txt").rdd
.map(_.split("\\|"))
.map(info => (info(2), info(3), s"${info(6)}-${info(7)}-${info(9)}-${info(13)}-${info(14)}"))
.collect()
val ipBroadcast = sparkSession.sparkContext.broadcast(ipInfos)
// format-http存储采集的IP日志信息
val locationIp = sparkSession.read.textFile("/bigdatalearnshare/location/format-http").rdd
.map(_.split("\\|"))
.map(_ (1))
.mapPartitions { iter =>
val ipInfoVal = ipBroadcast.value
iter.map { ip =>
val ipNum = ip2Num(ip)
val index = binarySearch(ipInfoVal, ipNum)
(ipInfoVal(index)._3, ip)
}
}
locationIp.map(x => (x._1, 1)).reduceByKey(_ + _)
.foreachPartition(saveData2MySQL(_))
sparkSession.stop()
}
}
2.首先我们要将IP地址转换为long型数字,目的是方便确定用户日志信息中的IP所处的IP段
/** 将ip转换成Long数字. 如1.86.64.0 => 22429696 */
def ip2Num(ip: String): Long = {
var ipNum = 0L
ip.split("[.]")
.foreach { i =>
ipNum = i.toLong | ipNum << 8L
}
ipNum
}
/** ip段中, ip是有序的, 所以采用二分查找法 */
def binarySearch(ipInfos: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = ipInfos.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= ipInfos(middle)._1.toLong) && (ip <= ipInfos(middle)._2.toLong))
return middle
if (ip < ipInfos(middle)._1.toLong) high = middle - 1 else low = middle + 1
}
-1
}
def saveData2MySQL(iterator: Iterator[(String, Int)], batchSize: Int = 1000): Unit = {
var conn: Connection = null
var pst: PreparedStatement = null
val sql = "INSERT INTO location_info (location, ip_count) VALUES (?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdatalearnshare", "root", "root")
// 生产中为了安全起见, 应该首先通过Connection的API获取metadata, 判断事务的支持情况, 失败情况下进行回滚. 这里为了方便, 不在做相应判断
conn.setAutoCommit(false)
pst = conn.prepareStatement(sql)
var rowCount = 0
iterator.foreach { case (location, count) =>
pst.setString(1, location)
pst.setInt(2, count)
pst.addBatch()
rowCount += 1
if (rowCount % batchSize == 0) {
pst.executeBatch()
conn.commit()
rowCount = 0
}
}
if (rowCount > 0) {
pst.executeBatch()
conn.commit()
}
} catch {
case e: Exception => println(e)
} finally {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}
四、统计结果示例
(河北-石家庄-卫视创捷-114.502461-38.045474,383)
(云南-昆明-鹏博士-102.712251-25.040609,126)
(重庆-重庆-铁通-106.56347-29.52311,3)
(重庆-重庆-铁通-107.39007-29.70292,47)
(重庆-重庆-铁通-106.51107-29.50197,91)
(重庆-重庆-铁通-106.504962-29.533155,400)
(北京-北京-鹏博士-116.405285-39.904989,1535)
(重庆-重庆-铁通-106.57434-29.60658,177)
......