2019. 12. 22. 20:34ㆍGo to 코딩천재/Big Data
1. What?

- Kafka는 비동기 처리를 위한 메시징 큐이다.
Source Application과 Target Application 사이의 데이터 전달자 역할을 한다.
- 비동기 메시징 시스템의 대표적 예시인 mail과 비슷하다.
보내는 사람이 원하는 사람에게 메일을 보내면 메일 서버에 저장되어 있게 된다.
덕분에 받는 사람은 자신이 원할 때 메일을 볼 수 있다.
-> 보내는 사람 = producer
-> 받는 사람 = consumer
-> 메일 서버 = kafka 서버
- Kafka는 데이터를 제공하는 프로듀서(소스 앱), 데이터를 이용하는 컨슈머(타겟 앱) 기능을 제공한다.
이는 라이브러리로 제공되므로 앱에서 구현 가능하다.

- Kafka의 데이터는 Topic이라는 공간에 저장되며, Topic은 여러 개의 Partition으로 나뉜다.
* Kafka 구조 = Topic 1 + Topic 2 + ...
* Topic 구조 = Partition 1 + Partition 2 + ...
* Topic에 이름을 지정하여 관리한다. (ex : click_log, send_msg, location_log 등)
* Partition에 데이터가 저장되는 형식은 FIFO이다. (=Queue)
* Partition의 개수를 늘릴 수는 있지만, 줄일 수는 없다.
-> Partition을 왜 늘릴까?
컨슈머의 개수를 늘려서 데이터 처리를 분산시킬 수 있어서!
* Partition의 데이터 삭제는 지정한 시간 및 크기에 따라 이루어진다.
-> log.retention.ms : record 최대 보존 시간
-> log.retention.byte : record 최대 보존 크기 (byte)
2. Why?
- 소스 앱과 타켓 앱의 개수가 늘어나면서, 주고 받는 데이터의 종류가 복잡해졌다.
중간에서 데이터를 한 번에 보관 및 관리해 줄 공간이 필요해졌다.
- LinkedIn에서 구직 및 채용 정보들을 한 곳에서 처리(발행/구독)할 수 있는 플랫폼으로 개발한 것이 Kafka이다.
3. Positive?
- 기존의 메시지 시스템의 큰 모델이었던, Queue 방식과 publish-subscribe 방식 두 가지를 다 가능하도록 만들었다.
Queue | Publish-Subscribe |
각 레코드는 Consumer 하나에 전달된다. | 모든 레코드가 모든 Consumer에게 전달된다. |
다중 subscriber가 아니여서, 한 프로세스가 읽은 데이터는 사라진다. | 다중 subscriber지만, 데이터 별 전달 조정이 불가능하다. |
Kafka는 두 방식의 장점을 Consumer그룹을 통해 가능하게 하였다.
Consumer그룹이 다르면 한 번 읽은 데이터도 재사용 가능하며, Consumer그룹으로 같은 데이터의 처리를 여러 개로 구분할 수 있다.
- 서버 이슈 및 전원 꺼짐에도 데이터를 손실없이 복구할 수 있다. (fault tolerant = 결함감내)
- 낮은 지연, 높은 처리량으로 많은 데이터 처리에 유리하다.
4. How?
- Kafka에 데이터를 전송하는 과정
[파티션이 1개인 토픽]
- 데이터는 끝에서부터 순차적으로 저장됨.
- 컨슈머가 파티션에서 데이터를 가져가도, 원본 데이터는 그대로 있다.
-> 컨슈머1이 가져간 데이터를 컨슈머2가 가져갈 수 있음. (단, 컨슈머 그룹이 다르고, auto.offset.reset=earliest이어야 함)
=> 같은 데이터를 다르게 여러 번 사용 가능하다. (카프카를 쓰는 주요 이유!)
[파티션이 2개 이상인 토픽]
- 데이터를 토픽에 저장할 때, 키를 지정하게 된다.
-> 키가 null && 기본 파티셔너 사용 = 라운드 로빈으로 파티션 할당
-> 키가 있고 && 기본 파티셔너 사용 = 키의 해시값으로 파티션 할당
-> 같은 키를 가진 데이터는 같은 파티션에 저장 됨
-> 하지만, 키를 사용하던 중간에 파티션 개수를 늘리면 더이상 키 값의 일관성을 보장할 수 없게 된다.
5. Example
1. 카프카 사용할 준비하기
- Source Application에 kafka-client 라이브러리를 추가한다.
- 보통 gradle, maven 등으로 추가한다.
- 이때 kafka-broker의 버전과 호환되는 버전의 라이브러리를 추가해야 한다.
-> 아래 표 참고
Kafka broker 버젼 |
Java client 호환성 |
Kafka stream 호환성 |
Kafka Connect 호환성 |
0.10.0 |
0.10.1을 제외한 모든 버젼 가능 |
0.10.0 버젼만 가능 |
0.10.1을 제외한 모든 버젼 가능 |
0.10.1 |
모든 버젼 가능 |
모든 버젼 가능 |
0.10.1 이하 버젼 가능 |
0.10.2 |
모든 버젼 가능 |
모든 버젼 가능 |
모든 버젼 가능 |
0.11.0 |
모든 버젼 가능 |
모든 버젼 가능 |
모든 버젼 가능 |
1.0.0 |
모든 버젼 가능 |
모든 버젼 가능 |
모든 버젼 가능 |
(표 출처 : https://blog.voidmainvoid.net/193)
2. 프로듀서로 카프카에 데이터 전송하기
public class producer {
public static void main(String[] args) throws IOExeption {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(configs);
ProducerRecord record = new ProducerRecord<String, String>("click_log", "login");
producer.send(record);
producer.close();
}
}
3. 컨슈머로 카프카 서버의 데이터 받기
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092"); // kafka server host, port
configs.put("session.timeout.ms", "10000"); // session
configs.put("group.id", "test20180604"); // topic
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key
deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer
consumer.subscribe(Arrays.asList("test20180604")); // topic 설정
// producer의 message 출력
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String s = record.topic();
if ("click_log".equals(s)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
(예제 출처 : http://junil-hwang.com/blog/kafka-java/)
개념 참고 : https://www.youtube.com/watch?v=aAu0FE3nvbk
'Go to 코딩천재 > Big Data' 카테고리의 다른 글
[Kafka] 용어 알아가기 (0) | 2019.12.29 |
---|---|
[Spark] 스파크 개념잡기 (0) | 2019.12.15 |
[Hadoop] Hadoop 개념 잡기 (2) | 2019.12.02 |