일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 에어플로우
- MySQL
- 카프카
- kafka rest api
- 데이터 엔지니어링
- kafka
- Data engineering
- 데이터
- spark streaming
- 컬럼 기반
- 데이터 웨어하우스
- 스파크 스트리밍
- 델타레이크
- delta lake
- Data Warehouse
- Redshift
- Zookeeper
- Schema Registry
- 스파크
- docker
- Data Engineer
- Parquet
- AWS
- 대용량 처리
- 데이터 엔지니어
- 카프카 구축
- airflow
- s3
- spark
- 레드시프트
- Today
- Total
데이터 엔지니어 기술 블로그
[🧙Kafka] 카프카 구축 (4) - 카프카 설정 구성하기 본문
About
카프카를 사용할 때 운영으로 사용하려면 구성을 잘 조절해야 한다. 스트리밍 데이터로 많은 데이터가 들어오고 나가기 때문에 비용 차이, 속도 차이가 클 수 있다. <kafka_directory>/config/server.properties 파일을 수정하여 설정을 적용할 수 있다.
카프카 구성하기
broker.id=1
카프카는 클러스터로 구축할 때 broker.id를 기준으로 브로커를 분류하기 때문에 하나의 클러스터에서 여러개의 broker.id를 지정할 수 없고 각기 다르게 지정해주어야 한다.
port=9092
카프카는 기본 포트로 9092를 사용한다.
num.network.threads=3
네트워크 요청을 처리하는 쓰레드 수 지정할 수 있다.
num.io.threads=8
IO가 발생했을 때 사용되어지는 쓰레드의 개수를 지정할 수 있다.
socket.send.buffer.bytes=102400
전송 버퍼와 소켓 서버가 사용하는 버퍼를 지정할 수 있다. 한 번에 보낼 양을 말한다.
socket.receive.buffer.bytes=102400
수신 버퍼와 소켓 서버가 사용하는 버퍼를 지정할 수 있다. 한 번에 받을 양을 말한다.
socket.request.max.bytes=104857600
소켓 서버가 수락하는 최대 바이트 수를 말한다. 이 값을 조절하면 메모리를 보호할 수 있다.
log.dirs=/tmp/kafka-logs
카프카는 모든 메시지를 로그 세그먼트 파일에 모아서 디스크에 저장한다. 경로를 쉼표로 구분해서 여러개의 경로에 저장할 수 있다.
브로커가 새로운 파티션을 저장할 때 가장 적은 디스크 용량이 아닌 가장 적은 수의 파티션을 저장한 경로를 사용하기 때문에 용량이 항상 고르게 분산되는 것은 아니다.
num.recovery.threads.per.data.dir=1
각 로그 디렉터리를 기준으로 복구 할 때 사용되는 쓰레드의 개수를 말한다. 브로커의 시작과 종료시에만 사용되므로 병행 처리를 하도록 많은 수의 쓰레드를 지정하는 것이 좋다.
logs.dirs가 3개고 이 값이 5라면 15개의 쓰레드가 실행된다.
log.retention.hours=168
카프카가 얼마동안 메시지를 보존할지 정할 수 있다. log.retention.ms 또는 log.retention.minutes 도 사용할 수 있다.
log.segment.bytes=1073741824
카프카는 로그 세그먼트를 기준으로 저장이 되는데, 저장률이 낮다면 이 값을 조절하면 된다. 1073741824는 1GB인데 하루에 100MB씩 채운다면 10일이 걸린 후에 저장이 된다.
특정 타임스탬프 시간으로 파티션의 오프셋을 요청하면 카프카는 그 시간에 해당하는 로그 세그먼트 파일을 찾아서 반환하기 때문에 이 기능에도 영향을 줄 수 있는 값이다.
log.retention.bytes=-1
저장된 메시지들의 전체 크기를 기준으로 보존량을 설정할 수 있다. 하나의 토픽이 4개의 파티션일 때 이 값이 1GB라면 전체 합쳐서 4GB 이상은 사용할 수 없다.
# 5GB
log.retention.bytes=5368709120
message.max.bytes = 1048588
프로듀서가 쓰려는 메시지의 최대 크기를 제한할 수 있다.
이 값은 크게 영향을 주기 때문에 잘 사용해야 한다. 메시지가 커질수록 네트워크를 사용할 수 있는 요청당 작업 시간이 길어지게 된다.
zookeeper.connect=pipeline-zookeeper-a:2181,pipeline-zookeeper-b:2181,pipeline-zookeeper-c:2181
브로커의 메타데이터를 저장할 주키퍼의 위치를 지정할 수 있다. 호스트이름:포트/경로 형식으로 지정할 수 있다.
여기에서는 다수의 주키퍼 서버를 앙상블로 구성해서 사용하고 있는데, 그 이유는 하나에 문제가 생기면 다른 두개로 버티기 위해서이다.
주키퍼 서버는 3개에서 5개 사이로 사용하는 것이 좋은데 너무 많으면 오히려 소요되는 시간이 더 걸리기 때문에 비효율적이다.
chroot 경로를 사용하는데 지정하지 않으면 실제 root의 경로가 사용된다. chroot 경로는 change root 라는 뜻이고 사용하는 이유는 다수의 카프카 클러스터를 사용했을 때 겹쳐서 문제가 발생하는 경우가 있기 때문이다.
zookeeper.connection.timeout.ms=18000
주키퍼에 연결할 때 시간 제한을 두고 그 이상이 되면 더 이상 시도를 하지 않는다.
auto.create.topics.enable=true
카프카에 토픽이 없을 때 Producer가 토픽을 보내거나, Consumer에서 가져오려고 시도할 때 토픽을 자동으로 생성하게 하는 것을 말한다. false로 두는 편이 관리하기 편하다.
num.partitions=1
새로운 토픽이 몇개의 파티션으로 생성되는지를 나타내는데 auto.create.topics.enable=true 일 경우에만 적용이 된다.
파티션 개수는 증가를 시킬 수는 있지만 감소를 시킬수는 없기 때문에 주의해서 사용해야 한다. 지정한다면 서버의 물리적 프로세서 수의 절반을 지정하는 편이 좋다.
기존에 구축했던 카프카 브로커의 설정
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = -1
broker.id.generation.enable = true
broker.rack = null
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.quota.window.num = 11
controller.quota.window.size.seconds = 1
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.max.bytes = 57671680
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 2.7-IV2
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.7-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connection.creation.rate = 2147483647
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1048588
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 30000
replica.selector.class = null
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
security.providers = null
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.clientCnxnSocket = null
zookeeper.connect = pipeline-zookeeper-a:2181,pipeline-zookeeper-b:2181,pipeline-zookeeper-c:2181
zookeeper.connection.timeout.ms = 18000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 18000
zookeeper.set.acl = false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable = false
zookeeper.ssl.crl.enable = false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable = false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
zookeeper.sync.time.ms = 2000
기타 구성 정보
메모리
카프카는 메모리의 페이지 캐시에 저장이 되기 때문에 메모리에 저장을 잘 할수록 성능이 좋아진다.
카프카의 메모리를 계산해서 설정하는 것은 어렵지만 문서에서는 write_throughput * 30으로 설정하라고 되어있다.
직접 사용해보면서 늘리고 줄이고 하는 편이 좋을 듯 하다.
CPU
메모리보다는 중요하지 않지만, 메시지를 압축하거나 오프셋 지정, 체크섬을 검사하는 등의 작업을 할 때 필요하다. checksum 이란 중복 검사를 말한다.
카프카 클러스터
클러스터에 10TB를 보존해야 한다면 브로커 5개일 경우 하나에 2TB씩 저장하면 된다. 하지만 데이터를 보존하기 위해 replication factor를 사용할 경우 최소 2배가 필요하다.
OS
스와핑이나 파일 시스템 등이 영향을 줄 수 있다.
네트워크
네트워크 트래픽이 많이 발생하기 때문에 중요하게 작용한다.
'데이터 엔지니어링' 카테고리의 다른 글
[🧙Kafka] 카프카 구축 (6) - 프로듀서로 카프카에 메시지 쓰기 (0) | 2021.04.08 |
---|---|
[🧙Kafka] 카프카 구축 (5) - 카프카 UI 간단하게 구성하기 (0) | 2021.04.08 |
[🧙Kafka] 카프카 구축 (3) - 카프카 클러스터 REST API로 사용하기 (0) | 2021.04.07 |
[🧙Kafka] 카프카 구축 (2) - 카프카 도커로 쉽게 설치하기 (0) | 2021.04.06 |
[🧙Kafka] 카프카 구축 (1) - 주키퍼 앙상블 쉽게 구축하기 (2) | 2021.04.06 |