Flink使用Broadcast State实现流处理配置实时更新
如果Data Stream是Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用KeyedBroadcastProcessFunction来实现,下面是KeyedBroadcastProcessFunction的API,代码如下所示:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
KS:表示Flink程序从最上游的Source Operator开始构建Stream,当调用keyBy时所依赖的Key的类型;
IN1:表示非Broadcast的Data Stream中的数据记录的类型;
IN2:表示Broadcast Stream中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
具体如何使用上面的BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用KeyedBroadcastProcessFunction为例进行详细说明。
针对用户在手机App上操作行为的事件,通过跟踪用户操作来实时触发指定的操作。假设我们关注一个用户在App上经过多次操作之后,比如浏览了几个商品、将浏览过的商品加入购物车、将购物车中的商品移除购物车等等,最后发生了购买行为,那么对于用户从开始到最终达成购买所进行操作的行为的次数,我们定义为用户购物路径长度,通过这个概念假设可以通过推送优惠折扣权限、或者适时地提醒用户使用App等运营活动,能够提高用户的复购率,这个是我们要达成的目标。
事件均以指定的格式被实时收集上来,我们统一使用JSON格式表示,例如,一个用户在App上操作行为我们定义有如下几种:
VIEW_PRODUCT
ADD_TO_CART
REMOVE_FROM_CART
PURCHASE
例如,下面是几个示例事件的记录,如下所示:
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":196}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"ADD_TO_CART","eventTime":"2018-06-12_09:43:18","data":{"productId":126}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}
{"userId":"d8f3368aba5df27a39cbcfd36ce8084f","channel":"APP","eventType":"PURCHASE","eventTime":"2018-06-12_09:30:28","data":{"productId":196,"price":600.00,"amount":600.00}}
{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":3}
假设满足大于配置的最大购物路径长度的用户,我们计算出该用户购物的路径长度,同时将其输出到另一个指定的Kafka Topic中,以便其它系统消费该Topic,从而对这些用户进行个性化运营。例如,计算得到的结果格式,除了一个购物路径长度外,还分别统计了达成购买过程中各个操作行为的个数,JSON格式字符串如下所示:
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","purchasePathLength":9,"eventTypeCounts":{"ADD_TO_CART":1,"PURCHASE":1,"VIEW_PRODUCT":7}}
用户操作行为事件实时写入到Kafka的Topic中,通过input-event-topic参数指定。
基于input-event-topic参数指定的Topic,创建一个Flink Source Operator,名称为kafkaUserEventSource。
基于kafkaUserEventSource创建一个Data Stream,名称为customerUserEventStream。
渠道配置信息,根据实际业务需要更新,并实时写入到Kafka的Topic中,通过input-config-topic参数指定。
基于input-config-topic参数指定的Topic,创建一个Flink Source Operator,名称为kafkaConfigEventSource。
基于kafkaConfigEventSource创建一个Broadcast Stream,名称为configBroadcastStream。
将上述创建的两个Stream,通过customerUserEventStream连接到configBroadcastStream,得到新的connectedStream。
基于connectedStream设置ProcessFunction实现,来处理新的Stream中的数据记录,可以在每个Task中基于获取到统一的配置信息,进而处理用户事件。
将处理结果发送到Flink Sink Operator,名称为kafkaSink。
kafkaSink将处理的结果,保存到Kafka的Topic中,通过output-topic指定Topic名称。
输入参数
LOG.info("Input args: " + Arrays.asList(args));
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 5) {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-event-topic <topic> --input-config-topic <topic> --output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"--zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
配置Flink环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(
"hdfs://namenode01.td.com/flink-checkpoints/customer-purchase-behavior-tracker"));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointInterval(1 * 60 * 60 * 1000);
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
private static class CustomWatermarkExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserEvent> {
public CustomWatermarkExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(UserEvent element) {
return element.getEventTimestamp();
}
}
创建用户行为事件Stream
// create customer user event stream
final FlinkKafkaConsumer010 kafkaUserEventSource = new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-event-topic"),
new SimpleStringSchema(), parameterTool.getProperties());
// (userEvent, userId)
final KeyedStream<UserEvent, String> customerUserEventStream = env
.addSource(kafkaUserEventSource)
.map(new MapFunction<String, UserEvent>() {
@Override
public UserEvent map(String s) throws Exception {
return UserEvent.buildEvent(s);
}
})
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor(Time.hours(24)))
.keyBy(new KeySelector<UserEvent, String>() {
@Override
public String getKey(UserEvent userEvent) throws Exception {
return userEvent.getUserId();
}
});
创建配置事件Stream
// create dynamic configuration event stream
final FlinkKafkaConsumer010 kafkaConfigEventSource = new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-config-topic"),
new SimpleStringSchema(), parameterTool.getProperties());
final BroadcastStream<Config> configBroadcastStream = env
.addSource(kafkaConfigEventSource)
.map(new MapFunction<String, Config>() {
@Override
public Config map(String value) throws Exception {
return Config.buildConfig(value);
}
})
.broadcast(configStateDescriptor);
private static final MapStateDescriptor<String, Config> configStateDescriptor =
new MapStateDescriptor<>(
"configBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Config>() {}));
private String channel;
private String registerDate;
private int historyPurchaseTimes;
private int maxPurchasePathLength;
连接两个Stream并实现计算处理
final FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new EvaluatedResultSchema(),
parameterTool.getProperties());
// connect above 2 streams
DataStream<EvaluatedResult> connectedStream = customerUserEventStream
.connect(configBroadcastStream)
.process(new ConnectedBroadcastProcessFuntion());
connectedStream.addSink(kafkaProducer);
env.execute("UserPurchaseBehaviorTracker");
// (channel, Map<uid, UserEventContainer>)
private final MapStateDescriptor<String, Map<String, UserEventContainer>> userMapStateDesc =
new MapStateDescriptor<>(
"userEventContainerState",
BasicTypeInfo.STRING_TYPE_INFO,
new MapTypeInfo<>(String.class, UserEventContainer.class));
我们先看一下processBroadcastElement()方法实现,代码如下所示:
@Override
public void processBroadcastElement(Config value, Context ctx, Collector<EvaluatedResult> out)
throws Exception {
String channel = value.getChannel();
BroadcastState<String, Config> state = ctx.getBroadcastState(configStateDescriptor);
final Config oldConfig = ctx.getBroadcastState(configStateDescriptor).get(channel);
if(state.contains(channel)) {
LOG.info("Configured channel exists: channel=" + channel);
LOG.info("Config detail: oldConfig=" + oldConfig + ", newConfig=" + value);
} else {
LOG.info("Config detail: defaultConfig=" + defaultConfig + ", newConfig=" + value);
}
// update config value for configKey
state.put(channel, value);
}
再看一下processElement()方法的实现,它的实现才是业务处理最核心的部分,代码如下所示:
@Override
public void processElement(UserEvent value, ReadOnlyContext ctx,
Collector<EvaluatedResult> out) throws Exception {
String userId = value.getUserId();
String channel = value.getChannel();
EventType eventType = EventType.valueOf(value.getEventType());
Config config = ctx.getBroadcastState(configStateDescriptor).get(channel);
LOG.info("Read config: channel=" + channel + ", config=" + config);
if (Objects.isNull(config)) {
config = defaultConfig;
}
final MapState<String, Map<String, UserEventContainer>> state =
getRuntimeContext().getMapState(userMapStateDesc);
// collect per-user events to the user map state
Map<String, UserEventContainer> userEventContainerMap = state.get(channel);
if (Objects.isNull(userEventContainerMap)) {
userEventContainerMap = Maps.newHashMap();
state.put(channel, userEventContainerMap);
}
if (!userEventContainerMap.containsKey(userId)) {
UserEventContainer container = new UserEventContainer();
container.setUserId(userId);
userEventContainerMap.put(userId, container);
}
userEventContainerMap.get(userId).getUserEvents().add(value);
// check whether a user purchase event arrives
// if true, then compute the purchase path length, and prepare to trigger predefined actions
if (eventType == EventType.PURCHASE) {
LOG.info("Receive a purchase event: " + value);
Optional<EvaluatedResult> result = compute(config, userEventContainerMap.get(userId));
result.ifPresent(r -> out.collect(result.get()));
// clear evaluated user's events
state.get(channel).remove(userId);
}
}
private Optional<EvaluatedResult> compute(Config config, UserEventContainer container) {
Optional<EvaluatedResult> result = Optional.empty();
String channel = config.getChannel();
int historyPurchaseTimes = config.getHistoryPurchaseTimes();
int maxPurchasePathLength = config.getMaxPurchasePathLength();
int purchasePathLen = container.getUserEvents().size();
if (historyPurchaseTimes < 10 && purchasePathLen > maxPurchasePathLength) {
// sort by event time
container.getUserEvents().sort(Comparator.comparingLong(UserEvent::getEventTimestamp));
final Map<String, Integer> stat = Maps.newHashMap();
container.getUserEvents()
.stream()
.collect(Collectors.groupingBy(UserEvent::getEventType))
.forEach((eventType, events) -> stat.put(eventType, events.size()));
final EvaluatedResult evaluatedResult = new EvaluatedResult();
evaluatedResult.setUserId(container.getUserId());
evaluatedResult.setChannel(channel);
evaluatedResult.setEventTypeCounts(stat);
evaluatedResult.setPurchasePathLength(purchasePathLen);
LOG.info("Evaluated result: " + evaluatedResult.toJSONString());
result = Optional.of(evaluatedResult);
}
return result;
}
./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic user_events --replication-factor 1 --partitions 1
./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic app_config --replication-factor 1 --partitions 1
./bin/kafka-topics.sh --zookeeper 172.23.4.138:2181,172.23.4.139:2181,172.23.4.140:2181/kafka --create --topic action_result --replication-factor 1 --partitions 1
bin/flink run -d -c org.shirdrn.flink.broadcaststate.UserPurchaseBehaviorTracker ~/flink-app-jobs.jar --input-event-topic user_events --input-config-topic app_config --output-topic action_result --bootstrap.servers 172.23.4.138:9092 --zookeeper.connect zk01.td.com:2181,zk02.td.com:2181,zk03.td.com:2181/kafka --group.id customer-purchase-behavior-tracker
./bin/kafka-console-producer.sh --topic app_config --broker-list 172.23.4.138:9092
{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":6}
./bin/kafka-console-producer.sh --topic app_config --broker-list 172.23.4.138:9092
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_08:45:24","data":{"productId":126}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_08:57:32","data":{"productId":273}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:08","data":{"productId":126}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:49","data":{"productId":103}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:21:59","data":{"productId":157}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"ADD_TO_CART","eventTime":"2018-06-12_09:43:18","data":{"productId":126}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"VIEW_PRODUCT","eventTime":"2018-06-12_09:27:11","data":{"productId":126}}{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","eventType":"PURCHASE","eventTime":"2018-06-12_09:30:28","data":{"productId":126,"price":299.00,"amount":260.00}}
{"userId":"a9b83681ba4df17a30abcf085ce80a9b","channel":"APP","purchasePathLength":9,"eventTypeCounts":{"ADD_TO_CART":1,"PURCHASE":1,"VIEW_PRODUCT":7}}
{"channel":"APP","registerDate":"2018-01-01","historyPurchaseTimes":0,"maxPurchasePathLength":20}
文章不错?点个【在看】吧! 👇