본문 바로가기
카테고리 없음

Kafka 설치 및 스프링 연동

by 수수남매 2023. 12. 19.
  • 스프링에서 Kafka 사용을 위한 라이브러리(Spring for Apache Kafka)를 제공하며 버전 Compatibility는 다음과 같음
  • 패키지 의존성 주입
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka'​
  • Kafka는 zookeeper와 kafka가 별도로 설치되어야 하고 zookeeper 실행 후 kafka를 실행하여야 하므로 docker-compose를 이용하는 것이 유리할 것으로 판단함
  • docker image는 zookeeper는 공식 이미지가 있지만 kafka는 공식 이미지가 없음, 대표적으로 bitnami와 confluentInc의 이미지가 쓰이는 걸로 보이며, wurstmeister를 사용한 사례도 블로그에서 많이 보임
    참고: https://log-laboratory.tistory.com/205
  • confluentInc의 version compatibility
  • 1-zookeeper & 2-kafka broker 설정(localhost) docker-compose.yml
    version: '2'
    
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:7.5.2
        container_name: zookeeper
        environment:
          # zookeeper 식별 ID
          ZOOKEEPER_SERVER_ID: 1
          # container 내부 port
          ZOOKEEPER_CLIENT_PORT: 2181
          # cluster 구성 시 동기화를 위한 tick time
          ZOOKEEPER_TICK_TIME: 2000
          # 초기화(리더와의 connection) 제한 시간 = tick_time * init_limit, 멀티 브로커에서 유효
          ZOOKEEPER_INIT_LIMIT: 5
          # 리더와 팔로워의 Sync time = tick_time * sync_limit, 멀티 브로커에서 유효
          ZOOKEEPER_SYNC_LIMIT: 2 
        ports:
          - "22181:2181"
    
      kafka1:
        image: confluentinc/cp-kafka:7.5.2
        container_name: kafka1
        depends_on:
          - zookeeper
        ports:
          - "19093:19093" # host port:container port, 외부와 컨테이너 연결 포트
        environment:
          # kafka 식별 ID
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          # 수신 대기 리스너 설정
          KAFKA_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://kafka1:19093
          # client connection을 위한 리스너 설정
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://localhost:19093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          # docker 내부 통신에 사용할 listener 이름
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          # 싱글 브로커는 1로 지정
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
          # consumer가 그룹 조인할 때의 대기시간
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    
      kafka2:
        image: confluentinc/cp-kafka:7.5.2
        container_name: kafka2
        depends_on:
          - zookeeper
        ports:
          - "29093:29093"
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENERS: INTERNAL://kafka2:29092,EXTERNAL://kafka2:29093
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:29092,EXTERNAL://localhost:29093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0​
  • application.properties 설정 (이 설정으로 스프링이 auto-configuration)
    # Kafka
    spring.kafka.bootstrap-servers=localhost:19093, localhost:29093
    
    spring.kafka.producer.bootstrap-servers=localhost:19093, localhost:29093
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    spring.kafka.consumer.bootstrap-servers=localhost:19093, localhost:29093
    spring.kafka.consumer.group-id=eventConsumers
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer​