尚硅谷大数据技术之电影推荐系统
2019-09-18 09:41阅读:
5.3.5 更新实时推荐结果
当计算出候选电影的推荐优先级的数组updatedRecommends后,这个数组将被发送到Web
后台服务器,与后台服务器上userId
的上次实时推荐结果recentRecommends进行合并、替换并选出优先级E
前K大的电影作为本次新的实时推荐。具体而言:
a.合并:将updatedRecommends
与recentRecommends
并集合成为一个新的数组;
b.替换(去重):当updatedRecommends
与recentRecommends
有重复的电影movieId
时,recentRecommends
中movieId
的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的movieId
的推荐优先级;
c.选取TopK:在合并、替换后的数组上,根据每个mov
ie 的推荐优先级,选择出前K
大的电影,作为本次实时推荐的最终结果。
5.4 实时系统联调
我们的系统实时推荐的数据流向是:业务系统 ->
日志 -> flume 日志采集
-> kafka streaming数据清洗和预处理 ->
spark streaming
流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。
5.4.1 启动实时系统的基本组件
启动实时推荐系统StreamingRecommender以及mongodb、redis
5.4.2 启动zookeeper
bin/zkServer.sh start
5.4.3 启动kafka
bin/kafka-server-start.sh -daemon
./config/server.properties
5.4.4 构建Kafka
Streaming程序
在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:
<</span>dependencies>
<</span>dependency>
<</span>groupId>org.apache.kafka</</span>groupId>
<</span>artifactId>kafka-streams</</span>artifactId>
<</span>version>0.10.2.1</</span>version>
</</span>dependency>
<</span>dependency>
<</span>groupId>org.apache.kafka</</span>groupId>
<</span>artifactId>kafka-clients</</span>artifactId>
<</span>version>0.10.2.1</</span>version>
</</span>dependency>
</</span>dependencies>
<</span>build>
<</span>finalName>kafkastream</</span>finalName>
<</span>plugins>
<</span>plugin>
<</span>groupId>org.apache.maven.plugins</</span>groupId>
<</span>artifactId>maven-assembly-plugin</</span>artifactId>
<</span>configuration>
<</span>archive>
<</span>manifest>
<</span>mainClass>com.atguigu.kafkastream.Application</</span>mainClass>
</</span>manifest>
</</span>archive>
<</span>descriptorRefs>
<</span>descriptorRef>jar-with-dependencies</</span>descriptorRef>
</</span>descriptorRefs>
</</span>configuration>
<</span>executions>
<</span>execution>
<</span>id>make-assembly</</span>id>
<</span>phase>package</</span>phase>
<</span>goals>
<</span>goal>single</</span>goal>
</</span>goals>
</</span>execution>
</</span>executions>
</</span>plugin>
</</span>plugins>
</</span>build>
在src/main/java下新建java类com.atguigu.kafkastreaming.Application
public class Application {
public static
void main(String[] args){
String
brokers = 'localhost:9092';
String
zookeepers =
'localhost:2181';
//
定义输入和输出的topic
String from = 'log';
String to
= 'recommender';
// 定义kafka
streaming的配置
Properties settings =
new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
'logFilter');
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
zookeepers);
StreamsConfig config = new
StreamsConfig(settings);
// 拓扑建构器
TopologyBuilder builder =
new TopologyBuilder();
// 定义流处理的拓扑结构
builder.addSource('SOURCE',
from)
.addProcessor('PROCESS', ()
-> new LogProcessor(),
'SOURCE')
.addSink('SINK', to,
'PROCESS');
KafkaStreams streams = new
KafkaStreams(builder, config);
streams.start();
}
}
这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。
流处理程序LogProcess.java
public class LogProcessor
implements
Processor<</span>byte[],byte[]>
{
private
ProcessorContext
context;
public void
init(ProcessorContext context) {
this.context
= context;
}
public void
process(byte[] dummy,
byte[] line) {
String input
= new String(line);
// 根据前缀过滤日志信息,提取后面的内容
if(input.contains('MOVIE_RATING_PREFIX:')){
System.out.println('movie
rating coming!!!!' + input);
input =
input.split('MOVIE_RATING_PREFIX:')[1].trim();
context.forward('logProcessor'.getBytes(),
input.getBytes());
}
}
public void
punctuate(long timestamp)
{
}
public void
close() {
}
}
完成代码后,启动Application。
5.4.5 配置并启动flume
在flume的conf目录下新建log-kafka.properties,对flume连接kafka做配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is
defined
agent.sources.exectail.type = exec
#
下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail –f
/mnt/d/Projects/BigData/MovieRecommenderSystem/businessServer/src/main/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type =
org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers =
localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink
or source)
# can be defined as well
# In this case, it specifies the capacity of the memory
channel
agent.channels.memoryChannel.capacity = 10000
配置好后,启动flume:
./bin/flume-ng agent -c ./conf/ -f
./conf/log-kafka.properties -n agent
-Dflume.root.logger=INFO,console
5.4.6 启动业务系统后台
将业务代码加入系统中。注意在src/main/resources/
下的log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与flume中的配置应该相同。
启动业务系统后台,访问localhost:8088/index.html;点击某个电影进行评分,查看实时推荐列表是否会发生变化。
本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。