[Kafka] Kafka 개념 및 기본사용법

2019. 12. 22. 20:34Go to 코딩천재/Big Data

 

1. What?

(이미지 출처 : https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-%EC%B2%98%EC%9D%8C-%EC%A0%91%ED%95%98%EB%8A%94-kafka/)

 

- Kafka는 비동기 처리를 위한 메시징 큐이다.

  Source Application과 Target Application 사이의 데이터 전달자 역할을 한다.

 

- 비동기 메시징 시스템의 대표적 예시인 mail과 비슷하다.

  보내는 사람이 원하는 사람에게 메일을 보내면 메일 서버에 저장되어 있게 된다.

  덕분에 받는 사람은 자신이 원할 때 메일을 볼 수 있다.

  -> 보내는 사람 = producer

  -> 받는 사람 = consumer

  -> 메일 서버 = kafka 서버

 

- Kafka는 데이터를 제공하는 프로듀서(소스 앱), 데이터를 이용하는 컨슈머(타겟 앱) 기능을 제공한다.
  이는 라이브러리로 제공되므로 앱에서 구현 가능하다.

 

 

카프카의 기본 구성

- Kafka의 데이터는 Topic이라는 공간에 저장되며, Topic은 여러 개의 Partition으로 나뉜다.

   * Kafka 구조 = Topic 1 + Topic 2 + ...

   * Topic 구조 = Partition 1 + Partition 2 + ...

   * Topic에 이름을 지정하여 관리한다. (ex : click_log, send_msg, location_log 등)

   * Partition에 데이터가 저장되는 형식은 FIFO이다. (=Queue)

   * Partition의 개수를 늘릴 수는 있지만, 줄일 수는 없다.

     -> Partition을 왜 늘릴까?

        컨슈머의 개수를 늘려서 데이터 처리를 분산시킬 수 있어서!

   * Partition의 데이터 삭제는 지정한 시간 및 크기에 따라 이루어진다.

    -> log.retention.ms : record 최대 보존 시간

    -> log.retention.byte : record 최대 보존 크기 (byte)

 

 

2. Why?

- 소스 앱과 타켓 앱의 개수가 늘어나면서, 주고 받는 데이터의 종류가 복잡해졌다.

  중간에서 데이터를 한 번에 보관 및 관리해 줄 공간이 필요해졌다.

- LinkedIn에서 구직 및 채용 정보들을 한 곳에서 처리(발행/구독)할 수 있는 플랫폼으로 개발한 것이 Kafka이다.

 

 

3. Positive?

- 기존의 메시지 시스템의 큰 모델이었던, Queue 방식과 publish-subscribe 방식 두 가지를 다 가능하도록 만들었다.

Queue Publish-Subscribe
각 레코드는 Consumer 하나에 전달된다. 모든 레코드가 모든 Consumer에게 전달된다.
다중 subscriber가 아니여서, 한 프로세스가 읽은 데이터는 사라진다. 다중 subscriber지만, 데이터 별 전달 조정이 불가능하다.

Kafka는 두 방식의 장점을 Consumer그룹을 통해 가능하게 하였다.

Consumer그룹이 다르면 한 번 읽은 데이터도 재사용 가능하며, Consumer그룹으로 같은 데이터의 처리를 여러 개로 구분할 수 있다.

 

- 서버 이슈 및 전원 꺼짐에도 데이터를 손실없이 복구할 수 있다. (fault tolerant = 결함감내)

- 낮은 지연, 높은 처리량으로 많은 데이터 처리에 유리하다.

 

 

4. How?

- Kafka에 데이터를 전송하는 과정

 

[파티션이 1개인 토픽]

- 데이터는 끝에서부터 순차적으로 저장됨.
- 컨슈머가 파티션에서 데이터를 가져가도, 원본 데이터는 그대로 있다.
  -> 컨슈머1이 가져간 데이터를 컨슈머2가 가져갈 수 있음. (단, 컨슈머 그룹이 다르고, auto.offset.reset=earliest이어야 함)
  => 같은 데이터를 다르게 여러 번 사용 가능하다. (카프카를 쓰는 주요 이유!)

[파티션이 2개 이상인 토픽]

- 데이터를 토픽에 저장할 때, 키를 지정하게 된다.
  -> 키가 null && 기본 파티셔너 사용 = 라운드 로빈으로 파티션 할당
  -> 키가 있고 && 기본 파티셔너 사용 = 키의 해시값으로 파티션 할당
      -> 같은 키를 가진 데이터는 같은 파티션에 저장 됨
      -> 하지만, 키를 사용하던 중간에 파티션 개수를 늘리면 더이상 키 값의 일관성을 보장할 수 없게 된다.

 

 

5. Example

1. 카프카 사용할 준비하기

- Source Application에 kafka-client 라이브러리를 추가한다.
  - 보통 gradle, maven 등으로 추가한다.
  - 이때 kafka-broker의 버전과 호환되는 버전의 라이브러리를 추가해야 한다.
     -> 아래 표 참고

 Kafka broker 버젼

Java client 호환성 

 Kafka stream 호환성

 Kafka Connect 호환성

 0.10.0

 0.10.1을 제외한 모든 버젼 가능

 0.10.0 버젼만 가능

 0.10.1을 제외한 모든 버젼 가능 

 0.10.1

 모든 버젼 가능

 모든 버젼 가능

 0.10.1 이하 버젼 가능

 0.10.2

 모든 버젼 가능

 모든 버젼 가능

 모든 버젼 가능

 0.11.0

 모든 버젼 가능

 모든 버젼 가능

 모든 버젼 가능

 1.0.0

 모든 버젼 가능

 모든 버젼 가능

 모든 버젼 가능

(표 출처 : https://blog.voidmainvoid.net/193)

 

 

2. 프로듀서로 카프카에 데이터 전송하기

public class producer {
   public static void main(String[] args) throws IOExeption {
      Properties configs = new Properties();
      configs.put("bootstrap.servers", "localhost:9092");
      configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      KafkaProducer producer = new KafkaProducer<String, String>(configs);
      ProducerRecord record = new ProducerRecord<String, String>("click_log", "login");
      producer.send(record);
      producer.close();
   }
}

 

3. 컨슈머로 카프카 서버의 데이터 받기

public class Consumer {
   public static void main(String[] args) {
      Properties configs = new Properties();
      configs.put("bootstrap.servers", "localhost:9092"); // kafka server host, port     
      configs.put("session.timeout.ms", "10000"); // session
      configs.put("group.id", "test20180604"); // topic
      configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key
      deserializer 
      configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer
      consumer.subscribe(Arrays.asList("test20180604")); // topic 설정

      // producer의 message 출력
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(500);
         for (ConsumerRecord<String, String> record : records) {
            String s = record.topic();
            if ("click_log".equals(s)) {
               System.out.println(record.value());
              } else {
                 throw new IllegalStateException("get message on topic " + record.topic());
              }
            }
          }
       }
}

(예제 출처 : http://junil-hwang.com/blog/kafka-java/)

 

개념 참고 : https://www.youtube.com/watch?v=aAu0FE3nvbk

 

'Go to 코딩천재 > Big Data' 카테고리의 다른 글

[Kafka] 용어 알아가기  (0) 2019.12.29
[Spark] 스파크 개념잡기  (0) 2019.12.15
[Hadoop] Hadoop 개념 잡기  (2) 2019.12.02