1. Kafka란 무엇인가?
Kafka는 대용량 데이터를 실시간으로 처리하기 위한 분산 메시징 시스템입니다.
일반적인 메시지 큐(RabbitMQ, ActiveMQ 등)와 달리, Kafka는 로그·이벤트·메트릭과 같은 데이터를 토픽(Topic) 단위로 묶어 Producer와 Consumer 간에 비동기적으로 교환합니다.
"데이터를 한 번 받아 여러 시스템으로 안정적으로 흘려보내는 중앙 통로"
2. Kafka의 핵심 개념 정리
| 개념 | 설명 |
|---|---|
| Broker | 데이터를 실제 저장·전달하는 서버 단위. 여러 개가 모여 클러스터를 구성. |
| Topic | 데이터가 저장되는 논리적 채널 (예: logs.demo) |
| Partition | 토픽 내부의 병렬 처리 단위. 병렬성과 순서를 함께 제어. |
| Producer | Kafka로 데이터를 전송하는 쪽 (예: Fluent Bit) |
| Consumer | Kafka에서 데이터를 읽는 쪽 (예: Fluentd) |
| Offset | 파티션 내에서 메시지의 고유 번호. 재시작 시 위치 복원에 사용. |
| Replication | 데이터 복제. 하나의 브로커 장애 시에도 메시지 손실 방지. |
| Consumer Group | 여러 Consumer가 협력하여 데이터를 병렬 처리하기 위한 그룹. |
Kafka는 내부적으로 파일시스템에 데이터를 Append-Only 로그 형태로 저장하기 때문에 매우 빠르고, 디스크 기반임에도 높은 처리량을 유지할 수 있습니다.
3. AWS MSK란?
Amazon MSK (Managed Streaming for Apache Kafka) 는 Kafka 클러스터의 설치, 구성, 업그레이드, 모니터링을 자동화한 완전관리형 서비스입니다.
직접 EC2에 Kafka를 설치하지 않아도 되며, TLS + SASL 보안통신, CloudWatch 모니터링, Auto Healing, Snapshot 복원까지 운영 레벨에서 자동으로 지원됩니다.
"Kafka의 운영지옥을 AWS가 대신해주는 서비스"
4. MSK 클러스터 아키텍처
5. MSK 구축 및 클러스터 설정
(1) 콘솔에서 클러스터 생성
- Create cluster → Provisioned 선택
- Kafka version:
3.8.0 - Instance type:
kafka.m5.large - Brokers per AZ: 1개 (총 3개)
- Authentication: TLS + SASL/SCRAM-SHA-512
- Monitoring: CloudWatch + OpenMonitoring 활성화
- 생성 완료 후 상태 Active 확인
(2) 사용자 계정 등록 (AWS Secrets Manager)
MSK의 SASL/SCRAM 인증은 kafka-configs.sh --zookeeper 방식이 아니라, AWS Secrets Manager를 통해 자격증명을 관리합니다. Secret 이름은 반드시 AmazonMSK_ prefix로 시작해야 합니다.
# 1. Secrets Manager에 SCRAM 자격증명 생성
aws secretsmanager create-secret \
--name AmazonMSK_msk_user \
--secret-string '{"username": "msk_user", "password": "msk_password"}'
# 2. 생성된 Secret을 MSK 클러스터에 연결
aws kafka batch-associate-scram-secret \
--cluster-arn <cluster-arn> \
--secret-arn-list <secret-arn>AWS Secrets Manager를 통해 자격증명을 관리하면 자동 로테이션, 감사 로그, 권한 제어 등을 함께 활용할 수 있습니다.
6. EC2에서 Kafka CLI 환경 구성
Java 설치
sudo apt update
sudo apt install -y openjdk-11-jdk
java -versionKafka CLI 다운로드
wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0환경 변수 등록:
echo 'export KAFKA_HOME=$HOME/kafka_2.13-3.8.0' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
source ~/.bashrc7. 인증 설정 (client.properties)
Kafka CLI가 MSK에 접속할 수 있도록 설정 파일을 작성합니다:
cat > client.properties <<'EOF'
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="msk_user" password="msk_password";
ssl.truststore.location=/usr/lib/jvm/java-11-openjdk-amd64/lib/security/cacerts
ssl.truststore.password=changeit
EOF8. Kafka 토픽 생성 및 메시지 송수신 테스트
토픽 생성
kafka-topics.sh \
--create \
--bootstrap-server \
b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,\
b-2.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,\
b-3.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096 \
--replication-factor 3 \
--partitions 3 \
--topic logs.demo \
--command-config client.propertiesProducer (전송)
kafka-console-producer.sh \
--bootstrap-server <brokers> \
--topic logs.demo \
--producer.config client.properties입력 예시:
{"@timestamp": "2025-10-22T14:10:00Z", "agent": "fg-sejong-01", "message": "VPN session started"}Consumer (수신)
kafka-console-consumer.sh \
--bootstrap-server <brokers> \
--topic logs.demo \
--from-beginning \
--consumer.config client.properties \
--max-messages 5출력 예시:
{"@timestamp":"2025-10-22T14:10:00Z","agent":"fg-sejong-01","message":"VPN session started"}9. Fluent Bit → Kafka → Fluentd 파이프라인 테스트
Fluent Bit 설정
[OUTPUT]
Name kafka
Match *
Brokers b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096
Topics logs.demo
rdkafka.security.protocol SASL_SSL
rdkafka.sasl.mechanisms SCRAM-SHA-512
rdkafka.sasl.username msk_user
rdkafka.sasl.password msk_password
Format json
Timestamp_Key @timestampFluent Bit의 Kafka output은 내부적으로 librdkafka를 사용합니다. SCRAM 인증을 사용하려면 Fluent Bit 빌드 시
libsasl2지원이 포함되어 있어야 합니다.
Fluentd 설정 (Consumer)
<source>
@type kafka_group
brokers b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,...
topics logs.demo
consumer_group fd-consumer-1
ssl_ca_certs_from_system true
sasl_over_ssl true
scram_mechanism sha512
username msk_user
password msk_password
format json
add_prefix parse
</source>
<match parse.logs.demo>
@type stdout
</match>결과
Fluent Bit → Kafka → Fluentd → stdout 까지 전체 파이프라인에서 메시지가 정상적으로 수신·파싱됨을 확인할 수 있습니다.
이제 Fluentd의 Output을 OpenSearch로 전환하면 완전한 실시간 로그 파이프라인이 완성됩니다.
마무리
이번 실습을 통해 다음 내용을 확인했습니다:
- Kafka의 핵심 동작 구조 (Producer–Broker–Consumer)
- AWS MSK 클러스터 구축 및 보안 설정
- Kafka CLI를 통한 토픽 생성·메시지 테스트
- Fluent Bit → Kafka → Fluentd 파이프라인의 실제 데이터 흐름
"Kafka는 단순한 메시지 브로커가 아니라, 시스템 간 데이터를 연결하는 '실시간 데이터 버스'다."