Star

云计算知识整理:使用Apache Kafka和Samza进行流处理

本文是关于Stream Processing with Kafka and Samza的介绍以及总结反思。

流处理

如今实时数据处理的需求越来越高,实时数据的来源包括传感器数据(比如物联网设备),社交网络交互,实时商业数据等。这些情况下需要极其的延迟率。比如LinkedIn就需要用实时的广告点击数据来不断扩充广告架构。类似Hadoop和Spark这种数据获取和数据处理分离的方式无法达到低延迟的实时处理需求。所以在这里就介绍一写管理和处理大量实时数据的流处理框架。

Apache Kafka

Kafka是分布式发布-订阅消息系统。由linkedin开发,后来成为Apache项目的一部分。发布者把消息放在不同的classes里,并不知道订阅者会如何使用这些数据。而订阅者可以订阅特定的消息并且只能收到相应的消息。Kafka使用commit log来保持数据,commit log是按顺序的,不可修改的,只能添加的数据结构。Kafka最大的优势是它提供完整的数据结构,所有的组织里的系统能够独立和可靠地获取数据。可以认为Kafka是流数据源。以下为一些主要Kafka术语:

  • Topic: 表示一个用户定义的类型,消息会在这个类别下发布。主要用partitioned log来维护。

  • Producers: 用来向Kafka集群中发布一个或多个topic信息的进程。

  • Consumers: 用来向Kafka集群中读取消息的进程。

  • Partitions: topics被分为多个partitions。一个partition代表一个并行单位。总的来说,partition越多,吞吐量越多。每个partition中的每个信息都有特定的偏移量,这样数据消费者能借此定位。简单来说,我们认为Kafka会根据key来给数据排序并提供,类似于MapReduce中的Map和Shuffle的阶段。

  • Brokers: Brokers用来负责数据持久化和复制。brokers会和producers交流来发布信息给Kafka集群,和consumers交谈来获取信息。

值得注意的是,kafka不会运行处理数据,只是一种存储和分类流数据的一种方式。在这个Project里,你需要用Samza来处理Kafka提供的数据流。

可以看下这个视频(通常搞不懂一个概念的时候我都是查youtube_(:зゝ∠)_ )Understanding Kafka with Legos

Apache Samza

Samza是由Linkedin开发的分布式流处理框架,以下为三层流处理框架中的关键组件:

  • Streaming:这一层是用partitioned stream的方式提供输入,这里也就是Kafka
  • Execution:在不同的机器间调度协调任务,这里使用YARN
  • Processing:负责具体的数据处理,这里使用Samza

Samza相关的术语包括:

  • Streams:等同于Kafka的topics
  • Jobs:使用Samza API来从一个或多个流读取和处理数据。一个Job可能被分割成不同的task,每个task可能会使用输入流中的一个或多个partition
  • Stateful Stream Processing:流处理可以分为有状态和无状态。

Samza结构:

Samza API

Samza API简单抽象,以一个ExampleCode为例,看Twitter类似的实时信息如何展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FanOutTask implements StreamTask, InitableTask, WindowableTask {
private KeyValueStore<String, String> socialGraph;
private KeyValueStore<String, Map<String, Object>> userTimeline;
private long numMessages = 0;
@Override
@SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) throws Exception {
socialGraph = (KeyValueStore<String, String>) context.getStore("social-graph");
userTimeline = (KeyValueStore<String, Map<String, Object>>) context.getStore("user-timelin
e");
}
@Override
@SuppressWarnings("unchecked")
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordin
ator coordinator) {
String incomingStream = envelope.getSystemStreamPartition().getStream();
if (incomingStream.equals(NewsfeedConfig.FOLLOWS_STREAM.getStream())) {
processFollowsEvent((Map<String, Object>) envelope.getMessage());
} else if (incomingStream.equals(NewsfeedConfig.MESSAGES_STREAM.getStream())) {
processMessageEvent((Map<String, Object>) envelope.getMessage(), collector);
} else {
throw new I

所有的Samza Jobs都要完成 StreamTask接口,有的时候还需要完成InitableTask和WindowableTask接口。