환경
mariadb : 11.7
kafka-connect : debezium connect : 3.1.2
kafka: cp-kafka 7.9.1 버전 이미지, kraft모드
mariadb : 11.7
참고
debezium 2.9이상부터는 mariadb connector와 mysqcl connector가 분리되었습니다.
2.7버전 이상부터 debezium은 docker-hub가 아닌 quay.io를 통해 배포됩니다.
1. 컨테이너 생성
mairadb, kafka, kafka-ui(선택), kafka-connect 컨테이너를 생성합니다.
services:
mariadb:
image: mariadb:latest
container_name: seqeunce-mariadb-server
environment:
MYSQL_ROOT_PASSWORD: 1234
MYSQL_DATABASE: sequence
MYSQL_USER: sequence123
MYSQL_PASSWORD: sequence123
command: >
--server-id=1
--binlog-format=ROW
--log-bin=mysql-bin
--binlog_legacy_event_pos=ON
--binlog_annotate_row_events=ON
ports:
- "3307:3306"
networks:
- sequence_default # mariadb와 동일한 외부 네트워크 사용
kafka:
image: confluentinc/cp-kafka:latest # Kafka 버전, KRaft 모드 지원 이미지
container_name: sequence-kafka
ports:
- "9092:9092" # Kafka 브로커 접근 포트 (클라이언트용)
environment:
KAFKA_NODE_ID: 1 # KRaft 노드 ID
KAFKA_PROCESS_ROLES: 'broker,controller' # 역할 지정
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@sequence-kafka:29093' # 컨트롤러 투표 노드
KAFKA_LISTENERS: 'PLAINTEXT://:19092,CONTROLLER://:29093,PLAINTEXT_HOST://0.0.0.0:29092,EXTERNAL://:9092' # 리스너 정의
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:19092,EXTERNAL://localhost:9092' # 브로커가 광고할 리스너들
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT' # 보안 프로토콜 매핑
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' # 컨트롤러용 리스너
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' # 브로커 간 통신용 리스너
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A==' # 고정된 클러스터 ID
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # 로그 디렉토리
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true # 토픽 자동 생성 활성화
KAFKA_NUM_PARTITIONS: 3 # 기본 파티션 수
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 기본 복제 팩터
networks:
- sequence_default # mariadb와 동일한 외부 네트워크 사용
kafka-connect:
image: quay.io/debezium/connect:3.1.2.Final # Debezium Kafka Connect 3.1 이미지
container_name: sequence-kafka-connect
ports:
- "8083:8083" # Kafka Connect REST API 포트
environment:
- BOOTSTRAP_SERVERS=kafka:19092 # Kafka 브로커 주소
- GROUP_ID=1 # 커넥트 클러스터 그룹 ID
- CONFIG_STORAGE_TOPIC=connect-configs # 커넥터 설정 저장용 내부 토픽
- OFFSET_STORAGE_TOPIC=connect-offsets # 커넥터 오프셋 저장 토픽
- STATUS_STORAGE_TOPIC=connect-status # 커넥터 상태 저장 토픽
- KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter # 메시지 키 JSON 직렬화
- VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter # 메시지 값 JSON 직렬화
- CONFIG_STORAGE_REPLICATION_FACTOR=1 # 설정 토픽 복제 수
- OFFSET_STORAGE_REPLICATION_FACTOR=1 # 오프셋 토픽 복제 수
- STATUS_STORAGE_REPLICATION_FACTOR=1 # 상태 토픽 복제 수
- CONNECT_TOPIC_CREATION_ENABLE=true # Connect 토픽 생성 활성화
depends_on:
- kafka # Kafka가 먼저 실행되어야 함
networks:
- sequence_default
kafka-ui:
image: provectuslabs/kafka-ui:latest # Kafka UI 최신 이미지
container_name: sequence-kafka-ui
ports:
- "8084:8080" # Kafka UI 접속 포트
environment:
- KAFKA_CLUSTERS_0_NAME=local-kafka # Kafka UI에서 표시될 클러스터 이름
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092 # 연결할 Kafka 브로커 주소
depends_on:
- kafka
networks:
- sequence_default
volumes:
kafka_data: # Kafka 로그/메타데이터용 볼륨
mariadb_data:/var/lib/mysql
networks:
sequence_default:
external: true # 외부에서 생성한 sequence 네트워크 사용
2. MariaDB connector 설정파일 작성
mariadb-connector.json 파일을 작성합니다. 해당 파일을 api 형태로 kafka-connect 서버에 전송하기 때문에 파일 위치는 상관없습니다.
{
"name": "mariadb-connector",
"config": {
"connector.class": "io.debezium.connector.mariadb.MariaDbConnector",
"database.hostname": "sequence-mariadb-server",
"database.port": "3306",
"database.user": "debezium-user",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "sequence",
"database.include.list": "sequence",
"database.useGtid": "false",
"table.include.list": "sequence.report",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.sequence",
"schema.history.internal.recovery.attempts": "3",
"include.schema.changes": "true",
"snapshot.mode": "initial",
"database.connectionTimeZone": "Asia/Seoul",
"database.allowPublicKeyRetrieval": "true",
"database.sslMode": "disable",
"database.history.store.only.captured.tables.ddl": "true",
"topic.prefix": "dbhistory"
}
}
해당 설정파일을 사용하여 mariadb-connector등록합니다
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @mariadb-connector.json
등록완료(제대로 등록됐는지 확인하기 위해서는 kafka-connect의 로그를 확인해보아야합니다)
{"name":"mariadb-connector","config":{"connector.class":"io.debezium.connector.mariadb.MariaDbConnector","database.hostname":"sequence-mariadb-server","database.port":"3306","database.user":"kafka","database.password":"kafka","database.server.id":"1","database.server.name":"sequence","database.include.list":"sequence","table.include.list":"sequence.report","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.sequence","include.schema.changes":"true","snapshot.mode":"initial","database.connectionTimeZone":"Asia/Seoul","database.allowPublicKeyRetrieval":"true","database.sslMode":"disable","database.history.store.only.captured.tables.ddl":"true","topic.prefix":"dbhistory","name":"mariadb-connector"},"tasks":[],"type":"source"}
참고
connector 삭제 명령어
curl -X DELETE http://localhost:8083/connectors/mariadb-connector
🚨주의
토픽을 자동으로 생성하지 못해 커넥터 등록에서 계속해서 오류가 발생하였습니다.
2025-06-05 12:39:04,880 ERROR || WorkerSourceTask{id=mariadb-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.
at io.debezium.connector.common.BaseSourceTask.validateAndLoadSchemaHistory(BaseSourceTask.java:125)
at io.debezium.connector.mariadb.MariaDbConnectorTask.start(MariaDbConnectorTask.java:142)
at
kafka와 kafka-connect에 토픽을 자동으로 등록할 수 있도록 설정을 추가하였습니다.
kafka
services:
kafka:
environment:
... # 기타 다른 설정들
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true # 토픽 자동 생성 활성화
KAFKA_NUM_PARTITIONS: 3 # 기본 파티션 수
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 기본 복제 팩터
kafka-connect
kafka-connect:
image: quay.io/debezium/connect:3.1.2.Final # Debezium Kafka Connect 3.1 이미지
container_name: sequence-kafka-connect
ports:
- "8083:8083" # Kafka Connect REST API 포트
environment:
... # 기타 다른 설정들
- CONNECT_TOPIC_CREATION_ENABLE=true # Connect 토픽 생성 활성화
결과
데이터 삽입시 메세지가 생성됨.
'IT 서비스 > 시퀀스' 카테고리의 다른 글
Kafka + CDC 기반 알림 시스템: 구조를 왜 다시 고민하게 되었을까? (0) | 2025.06.07 |
---|---|
[ 시퀀스 백오피스 ] 왜 나는 백오피스 서버에 API 대신 DB를 바라보게 했는가 – 실시간 알림 설계 고민기 (1) | 2025.06.04 |
트러블 슈팅 - Project의 bookmarkCount 컬럼이 null로 초기화됨 (2) | 2025.05.29 |
프론트에서 서버로 쿠키 전달 되지 않음(SameSite 정책 문제) (1) | 2025.04.07 |
10만개의 유저 Mock Data를 넣어보자 with 멀티 스레드 (0) | 2025.03.16 |
댓글