본문 바로가기
IT 서비스/시퀀스

Kafka + debezium mariadb connector 구축 ( kraft모드 + debezium 3.1버전)

by agong이 2025. 6. 5.

환경

mariadb : 11.7

kafka-connect : debezium connect : 3.1.2

kafka:  cp-kafka 7.9.1 버전 이미지, kraft모드

mariadb : 11.7

 

참고

debezium 2.9이상부터는 mariadb connector와 mysqcl connector가 분리되었습니다.  

2.7버전 이상부터 debezium은 docker-hub가 아닌 quay.io를 통해 배포됩니다.

 

 

1. 컨테이너 생성

mairadb, kafka, kafka-ui(선택), kafka-connect 컨테이너를 생성합니다.

services:
    mariadb:
      image: mariadb:latest
      container_name: seqeunce-mariadb-server
      environment:
        MYSQL_ROOT_PASSWORD: 1234
        MYSQL_DATABASE: sequence
        MYSQL_USER: sequence123
        MYSQL_PASSWORD: sequence123
      command: >
      	--server-id=1
      	--binlog-format=ROW
      	--log-bin=mysql-bin
      	--binlog_legacy_event_pos=ON
      	--binlog_annotate_row_events=ON
      ports:
        - "3307:3306"
      networks:
      	- sequence_default  # mariadb와 동일한 외부 네트워크 사용
      kafka:
    image: confluentinc/cp-kafka:latest  # Kafka 버전, KRaft 모드 지원 이미지
    container_name: sequence-kafka
    ports:
      - "9092:9092"  # Kafka 브로커 접근 포트 (클라이언트용)
    environment:
      KAFKA_NODE_ID: 1  # KRaft 노드 ID
      KAFKA_PROCESS_ROLES: 'broker,controller'  # 역할 지정
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@sequence-kafka:29093'  # 컨트롤러 투표 노드
      KAFKA_LISTENERS: 'PLAINTEXT://:19092,CONTROLLER://:29093,PLAINTEXT_HOST://0.0.0.0:29092,EXTERNAL://:9092'  # 리스너 정의
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:19092,EXTERNAL://localhost:9092'  # 브로커가 광고할 리스너들
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT'  # 보안 프로토콜 매핑
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'  # 컨트롤러용 리스너
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'  # 브로커 간 통신용 리스너
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='  # 고정된 클러스터 ID
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'  # 로그 디렉토리
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true  # 토픽 자동 생성 활성화
      KAFKA_NUM_PARTITIONS: 3  # 기본 파티션 수
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1  # 기본 복제 팩터
    networks:
      - sequence_default  # mariadb와 동일한 외부 네트워크 사용

  kafka-connect:
    image: quay.io/debezium/connect:3.1.2.Final  # Debezium Kafka Connect 3.1 이미지
    container_name: sequence-kafka-connect
    ports:
      - "8083:8083"  # Kafka Connect REST API 포트
    environment:
      - BOOTSTRAP_SERVERS=kafka:19092  # Kafka 브로커 주소
      - GROUP_ID=1  # 커넥트 클러스터 그룹 ID
      - CONFIG_STORAGE_TOPIC=connect-configs  # 커넥터 설정 저장용 내부 토픽
      - OFFSET_STORAGE_TOPIC=connect-offsets  # 커넥터 오프셋 저장 토픽
      - STATUS_STORAGE_TOPIC=connect-status  # 커넥터 상태 저장 토픽
      - KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 메시지 키 JSON 직렬화
      - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter  # 메시지 값 JSON 직렬화
      - CONFIG_STORAGE_REPLICATION_FACTOR=1  # 설정 토픽 복제 수
      - OFFSET_STORAGE_REPLICATION_FACTOR=1  # 오프셋 토픽 복제 수
      - STATUS_STORAGE_REPLICATION_FACTOR=1  # 상태 토픽 복제 수
      - CONNECT_TOPIC_CREATION_ENABLE=true  # Connect 토픽 생성 활성화
    depends_on:
      - kafka  # Kafka가 먼저 실행되어야 함
    networks:
      - sequence_default

  kafka-ui:
    image: provectuslabs/kafka-ui:latest  # Kafka UI 최신 이미지
    container_name: sequence-kafka-ui
    ports:
      - "8084:8080"  # Kafka UI 접속 포트
    environment:
      - KAFKA_CLUSTERS_0_NAME=local-kafka  # Kafka UI에서 표시될 클러스터 이름
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092  # 연결할 Kafka 브로커 주소
    depends_on:
      - kafka
    networks:
      - sequence_default
volumes:
  kafka_data:  # Kafka 로그/메타데이터용 볼륨
  mariadb_data:/var/lib/mysql


networks:
  sequence_default:
    external: true  # 외부에서 생성한 sequence 네트워크 사용

 

2. MariaDB connector 설정파일 작성

mariadb-connector.json 파일을 작성합니다. 해당 파일을 api 형태로 kafka-connect 서버에 전송하기 때문에 파일 위치는 상관없습니다.

 

{
  "name": "mariadb-connector",
  "config": {
    "connector.class": "io.debezium.connector.mariadb.MariaDbConnector",
    "database.hostname": "sequence-mariadb-server",
    "database.port": "3306",
    "database.user": "debezium-user",
    "database.password": "password",
    "database.server.id": "1",
    "database.server.name": "sequence",
    "database.include.list": "sequence",
    "database.useGtid": "false",
    "table.include.list": "sequence.report",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.sequence",
    "schema.history.internal.recovery.attempts": "3",
    "include.schema.changes": "true",
    "snapshot.mode": "initial",
    "database.connectionTimeZone": "Asia/Seoul",
    "database.allowPublicKeyRetrieval": "true",
    "database.sslMode": "disable",
    "database.history.store.only.captured.tables.ddl": "true",
    "topic.prefix": "dbhistory"
  }
}

 

해당 설정파일을 사용하여 mariadb-connector등록합니다

curl -X POST http://localhost:8083/connectors   -H "Content-Type: application/json"   -d @mariadb-connector.json

 

등록완료(제대로 등록됐는지 확인하기 위해서는 kafka-connect의 로그를 확인해보아야합니다)

{"name":"mariadb-connector","config":{"connector.class":"io.debezium.connector.mariadb.MariaDbConnector","database.hostname":"sequence-mariadb-server","database.port":"3306","database.user":"kafka","database.password":"kafka","database.server.id":"1","database.server.name":"sequence","database.include.list":"sequence","table.include.list":"sequence.report","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.sequence","include.schema.changes":"true","snapshot.mode":"initial","database.connectionTimeZone":"Asia/Seoul","database.allowPublicKeyRetrieval":"true","database.sslMode":"disable","database.history.store.only.captured.tables.ddl":"true","topic.prefix":"dbhistory","name":"mariadb-connector"},"tasks":[],"type":"source"}

 

참고

connector 삭제 명령어

curl -X DELETE http://localhost:8083/connectors/mariadb-connector

 

🚨주의

토픽을 자동으로 생성하지 못해 커넥터 등록에서 계속해서 오류가 발생하였습니다.

2025-06-05 12:39:04,880 ERROR  ||  WorkerSourceTask{id=mariadb-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.
        at io.debezium.connector.common.BaseSourceTask.validateAndLoadSchemaHistory(BaseSourceTask.java:125)
        at io.debezium.connector.mariadb.MariaDbConnectorTask.start(MariaDbConnectorTask.java:142)
        at

 

kafka와 kafka-connect에 토픽을 자동으로 등록할 수 있도록 설정을 추가하였습니다.

kafka

services:
	kafka:
    	environment:
        ... # 기타 다른 설정들
    	KAFKA_AUTO_CREATE_TOPICS_ENABLE: true  # 토픽 자동 생성 활성화
    	KAFKA_NUM_PARTITIONS: 3  # 기본 파티션 수
    	KAFKA_DEFAULT_REPLICATION_FACTOR: 1  # 기본 복제 팩터

 

kafka-connect

kafka-connect:
      image: quay.io/debezium/connect:3.1.2.Final  # Debezium Kafka Connect 3.1 이미지
      container_name: sequence-kafka-connect
      ports:
        - "8083:8083"  # Kafka Connect REST API 포트
      environment:
      	... # 기타 다른 설정들
        - CONNECT_TOPIC_CREATION_ENABLE=true  # Connect 토픽 생성 활성화

 

결과

데이터 삽입시 메세지가 생성됨.

댓글