查看原文
其他

网站日志实时分析之Flink处理实时热门和PVUV统计

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysis
import java.sql.Timestampimport java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
/**定义输入数据的样例类 */case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)//定义窗口聚合结果样例类case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems { def main(args: Array[String]): Unit = { //1:创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置为事件事件 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //2:读取数据
/*kafka源*/ val properties = new Properties() properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008") properties.setProperty("group.id","web-consumer-group") properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset","latest") val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))// val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv") .map(data =>{ System.out.println("data:"+data) val dataArray = data.split(",")// if(dataArray(0).equals("ij")) UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
}) .assignAscendingTimestamps(_.timestamp * 1000L)
//3:transform处理数据 val processStream = dataStream //筛选出埋点pv数据 .filter(_.behavior.equals("pv")) //先对itemID进行分组 .keyBy(_.itemId) //然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口 .timeWindow(Time.seconds(20), Time.seconds(10)) //窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby .aggregate(new CountAgg(), new WindowResult()) .keyBy(_.windowEnd) //按照窗口分组
.process(new TopNHotItems(10))

//sink:输出数据 processStream.print("processStream::")// dataStream.print() //执行 env.execute("hot Items Job")


}}
/*自定义预聚合函数*/class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{ //累加器初始值 override def createAccumulator(): Long = 0 //每来一次就加一 override def add(in: UserBehavior, acc: Long): Long = acc+1 // override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1}
//自定义窗口函数,输出ItemViewCountclass WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{ override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { out.collect(ItemViewCount(key,window.getEnd,input.iterator.next())) }}
//自定义处理函数class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] { private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = { itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
} override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = { //把每条数据存入状态列表 itemState.add(value) //注册一个定时器 ctx.timerService().registerEventTimeTimer(value.windowEnd + 1) } //定时器触发时,对所有的数据排序,并输出结果 override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = { //将所有state中的数据取出,放到一个list Buffer中 val allItems: ListBuffer[ItemViewCount] = new ListBuffer() import scala.collection.JavaConversions._ for(item <- itemState.get()){ allItems += item }
//按照点计量count大小排序,sortBy默认是升序,并且取前三个 val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)
//清空状态 itemState.clear()
//格式化输出排名结果 val result : StringBuilder = new StringBuilder result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n") //输出每一个商品信息 for(i<- sortedItems.indices){ val currentItem = sortedItems(i) result.append("No").append(i+1).append(":") .append(" 商品ID:").append(currentItem.itemId) .append(" 浏览量:").append(currentItem.count).append("\n") } result.append("============================\n") //控制输出频率 Thread.sleep(1000) out.collect(result.toString()) }}/*自定义预聚合函数计算平均数*/class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{ override def createAccumulator(): (Long, Int) = (0L,0)
override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)
override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2
override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysis
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Time
/**定义输入数据的样例类 */case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
object PageVies { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
//用相对路径定义数据集 val resource = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(resource.getPath) .map(data =>{ val dataArray = data.split(",") UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) .filter(_.behavior.equals("pv")) .map(data => ("pv", 1)) .keyBy(_._1) .timeWindow(Time.hours(1)) .sum(1) dataStream.print("pv count") env.execute("PV") }}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysis
import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClassimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}import org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport redis.clients.jedis.Jedis
object UvWithBloom { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
//用相对路径定义数据集 val resource = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv") .map(data =>{ val dataArray = data.split(",") UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) .filter(_.behavior.equals("pv")) .map( data => ("dummyKey",data.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new MyTrigger()) .process(new UvCountWithBloom())
dataStream.print() env.execute() }}

//自定义窗口触发器class MyTrigger() extends Trigger[(String,Long),TimeWindow]{ override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { //每来一条数据就直接触发窗口操作,并清空所有状态 TriggerResult.FIRE_AND_PURGE }
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}}class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] { // 定义Redis连接 lazy val jedis = new Jedis("114.116.219.97",5000) //29位,也就是64M lazy val bloom = new Bloom(1 << 29) override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { //位图的存储方式 , key是windowwen,value是位图 val storeKey = context.window.getEnd.toString var count = 0L //把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取

if(jedis.hget("count",storeKey) != null){// System.out.println(v) count = jedis.hget("count",storeKey).toLong } //用布隆过滤器判断当前用户是否已经存在 val userId = elements.last._2.toString val offset = bloom.hash(userId, 61) //定义一个标志位,判断Redis位图中有没有这一位 val isExist = jedis.getbit(storeKey, offset) if(!isExist){ //如果不存在位图对应位置变成1,count+1 jedis.setbit(storeKey,offset,true) jedis.hset("count",storeKey,(count+1).toString) out.collect(UvCount(storeKey.toLong,count+1)) }else{ out.collect(UvCount(storeKey.toLong,count)) } }}
class Bloom(size: Long) extends Serializable{ //位图大小 private val cap = if(size>0) size else 1 << 27
//定义Hash函数 def hash(value: String, seed: Int) : Long = { var result:Long = 0L for(i <- 0 until value.length){ result = result * seed + value.charAt(i) } result & (cap-1) }}
欢迎点赞+收藏+转发朋友圈素质三连



文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存