DevTech
2026년 3월 12일 · 22 min read

AWS MSK와 Kafka로 구축하는 로그 파이프라인 — 개념부터 실습까지

1. Kafka란 무엇인가?

Kafka는 대용량 데이터를 실시간으로 처리하기 위한 분산 메시징 시스템입니다.

일반적인 메시지 큐(RabbitMQ, ActiveMQ 등)와 달리, Kafka는 로그·이벤트·메트릭과 같은 데이터를 토픽(Topic) 단위로 묶어 ProducerConsumer 간에 비동기적으로 교환합니다.

"데이터를 한 번 받아 여러 시스템으로 안정적으로 흘려보내는 중앙 통로"


2. Kafka의 핵심 개념 정리

개념 설명
Broker 데이터를 실제 저장·전달하는 서버 단위. 여러 개가 모여 클러스터를 구성.
Topic 데이터가 저장되는 논리적 채널 (예: logs.demo)
Partition 토픽 내부의 병렬 처리 단위. 병렬성과 순서를 함께 제어.
Producer Kafka로 데이터를 전송하는 쪽 (예: Fluent Bit)
Consumer Kafka에서 데이터를 읽는 쪽 (예: Fluentd)
Offset 파티션 내에서 메시지의 고유 번호. 재시작 시 위치 복원에 사용.
Replication 데이터 복제. 하나의 브로커 장애 시에도 메시지 손실 방지.
Consumer Group 여러 Consumer가 협력하여 데이터를 병렬 처리하기 위한 그룹.

Kafka는 내부적으로 파일시스템에 데이터를 Append-Only 로그 형태로 저장하기 때문에 매우 빠르고, 디스크 기반임에도 높은 처리량을 유지할 수 있습니다.


3. AWS MSK란?

Amazon MSK (Managed Streaming for Apache Kafka) 는 Kafka 클러스터의 설치, 구성, 업그레이드, 모니터링을 자동화한 완전관리형 서비스입니다.

직접 EC2에 Kafka를 설치하지 않아도 되며, TLS + SASL 보안통신, CloudWatch 모니터링, Auto Healing, Snapshot 복원까지 운영 레벨에서 자동으로 지원됩니다.

"Kafka의 운영지옥을 AWS가 대신해주는 서비스"


4. MSK 클러스터 아키텍처

flowchart TD A["Fluent Bit Agent (Customer / On-prem)"] B[("AWS MSK Cluster (3 Brokers / 3 AZs)")] C["Fluentd Parser (SaaS Backend Node)"] D[("OpenSearch Index (fortigateutm-YYYY.MM.DD)")] A -->|"TLS + SASL"| B B -->|"Kafka Consumer"| C C -->|"HTTP / JSON"| D

5. MSK 구축 및 클러스터 설정

(1) 콘솔에서 클러스터 생성

  1. Create cluster → Provisioned 선택
  2. Kafka version: 3.8.0
  3. Instance type: kafka.m5.large
  4. Brokers per AZ: 1개 (총 3개)
  5. Authentication: TLS + SASL/SCRAM-SHA-512
  6. Monitoring: CloudWatch + OpenMonitoring 활성화
  7. 생성 완료 후 상태 Active 확인

(2) 사용자 계정 등록 (AWS Secrets Manager)

MSK의 SASL/SCRAM 인증은 kafka-configs.sh --zookeeper 방식이 아니라, AWS Secrets Manager를 통해 자격증명을 관리합니다. Secret 이름은 반드시 AmazonMSK_ prefix로 시작해야 합니다.

# 1. Secrets Manager에 SCRAM 자격증명 생성
aws secretsmanager create-secret \
  --name AmazonMSK_msk_user \
  --secret-string '{"username": "msk_user", "password": "msk_password"}'

# 2. 생성된 Secret을 MSK 클러스터에 연결
aws kafka batch-associate-scram-secret \
  --cluster-arn <cluster-arn> \
  --secret-arn-list <secret-arn>

AWS Secrets Manager를 통해 자격증명을 관리하면 자동 로테이션, 감사 로그, 권한 제어 등을 함께 활용할 수 있습니다.


6. EC2에서 Kafka CLI 환경 구성

Java 설치

sudo apt update
sudo apt install -y openjdk-11-jdk
java -version

Kafka CLI 다운로드

wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

환경 변수 등록:

echo 'export KAFKA_HOME=$HOME/kafka_2.13-3.8.0' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
source ~/.bashrc

7. 인증 설정 (client.properties)

Kafka CLI가 MSK에 접속할 수 있도록 설정 파일을 작성합니다:

cat > client.properties <<'EOF'
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="msk_user" password="msk_password";
ssl.truststore.location=/usr/lib/jvm/java-11-openjdk-amd64/lib/security/cacerts
ssl.truststore.password=changeit
EOF

8. Kafka 토픽 생성 및 메시지 송수신 테스트

토픽 생성

kafka-topics.sh \
  --create \
  --bootstrap-server \
    b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,\
    b-2.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,\
    b-3.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096 \
  --replication-factor 3 \
  --partitions 3 \
  --topic logs.demo \
  --command-config client.properties

Producer (전송)

kafka-console-producer.sh \
  --bootstrap-server <brokers> \
  --topic logs.demo \
  --producer.config client.properties

입력 예시:

{"@timestamp": "2025-10-22T14:10:00Z", "agent": "fg-sejong-01", "message": "VPN session started"}

Consumer (수신)

kafka-console-consumer.sh \
  --bootstrap-server <brokers> \
  --topic logs.demo \
  --from-beginning \
  --consumer.config client.properties \
  --max-messages 5

출력 예시:

{"@timestamp":"2025-10-22T14:10:00Z","agent":"fg-sejong-01","message":"VPN session started"}

9. Fluent Bit → Kafka → Fluentd 파이프라인 테스트

Fluent Bit 설정

[OUTPUT]
    Name                   kafka
    Match                  *
    Brokers                b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096
    Topics                 logs.demo
    rdkafka.security.protocol  SASL_SSL
    rdkafka.sasl.mechanisms    SCRAM-SHA-512
    rdkafka.sasl.username      msk_user
    rdkafka.sasl.password      msk_password
    Format                 json
    Timestamp_Key          @timestamp

Fluent Bit의 Kafka output은 내부적으로 librdkafka를 사용합니다. SCRAM 인증을 사용하려면 Fluent Bit 빌드 시 libsasl2 지원이 포함되어 있어야 합니다.

Fluentd 설정 (Consumer)

<source>
  @type kafka_group
  brokers b-1.testmsk.cf4eh5.c4.kafka.ap-northeast-2.amazonaws.com:9096,...
  topics logs.demo
  consumer_group fd-consumer-1
  ssl_ca_certs_from_system true
  sasl_over_ssl true
  scram_mechanism sha512
  username msk_user
  password msk_password
  format json
  add_prefix parse
</source>

<match parse.logs.demo>
  @type stdout
</match>

결과

Fluent Bit → Kafka → Fluentd → stdout 까지 전체 파이프라인에서 메시지가 정상적으로 수신·파싱됨을 확인할 수 있습니다.

이제 Fluentd의 Output을 OpenSearch로 전환하면 완전한 실시간 로그 파이프라인이 완성됩니다.


마무리

이번 실습을 통해 다음 내용을 확인했습니다:

  • Kafka의 핵심 동작 구조 (Producer–Broker–Consumer)
  • AWS MSK 클러스터 구축 및 보안 설정
  • Kafka CLI를 통한 토픽 생성·메시지 테스트
  • Fluent Bit → Kafka → Fluentd 파이프라인의 실제 데이터 흐름

"Kafka는 단순한 메시지 브로커가 아니라, 시스템 간 데이터를 연결하는 '실시간 데이터 버스'다."


Sponsored
💬 Comments powered by Giscus