반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 카프카 구축
- spark streaming
- Parquet
- 데이터 엔지니어링
- s3
- 레드시프트
- 카프카
- 델타레이크
- Data engineering
- 에어플로우
- 데이터 웨어하우스
- spark
- 대용량 처리
- AWS
- 스파크
- kafka
- Redshift
- Zookeeper
- 데이터 엔지니어
- 데이터
- Data Engineer
- kafka rest api
- docker
- 스파크 스트리밍
- Data Warehouse
- Schema Registry
- airflow
- 컬럼 기반
- delta lake
- MySQL
Archives
- Today
- Total
데이터 엔지니어 기술 블로그
[⚡AWS] 키네시스(Kinesis): 실시간 데이터 스트림 본문
반응형
Kinesis란?
Kinesis는 실시간으로 데이터 스트림을 수집, 처리, 분석해주는 서비스이다.
Data Streams
- 데이터 스트림 수집 및 저장
- 샤드의 수를 조절하여 스트림을 얼마나 받을지 조절할 수 있다.
Data Firehose
데이터 스트림 처리 및 전송
Data Analytics
- 스트리밍 데이터 분석
- 실시간 분석 생성 – 지표를 계산하고, Kinesis를 통해 Amazon S3 또는 Amazon Redshift로 전송할 수 있다.
- 실시간 대시보드 제공 – 집계 및 처리된 스트리밍 데이터 결과를 전송하여 실시간 대시보드를 구성할 수 있다.
- 실시간 지표 생성 – 실시간 모니터링, 알림, 경보에 사용할 사용자 지정 지표와 트리거를 생성할 수 있다.
Video Streams
- 재생 및 분석을 위해 미디어 스트림을 캡처, 저장 및 처리
Data Streams
용어 정리
- Data Record
- 데이터 레코드는 데이터 스트림에서 사용되는 메세지를 말한다.
- sequence number, partition key, data blob 등으로 이루어져 있으며 하나에 1MB 까지 사용할 수 있다.
- 한 번 스트림에 들어가면 변경이 불가능하다.
- Retention Period
- 데이터 레코드의 보존 기간은 기본적으로 24시간이다.
- 최소 24시간이며 최대 365일까지 증가시킬 수 있으나 요금이 추가된다.
- Producer
- 스트림에 데이터를 보내준다.
- Consumer
- 스트림에 있는 데이터를 가져와서 사용한다.
- Shard
- 샤드는 데이터 스트림의 단위를 말한다. 하나의 스트림은 하나 또는 그 이상의 샤드로 구성될 수 있다.
- 하나의 샤드는 읽을 때 최대 5개의 트랜잭션을 지원하며 최대 초당 2MB를 읽을 수 있고, 초당 1000개의 레코드를 쓰고 초당 1MB를 쓸 수 있다.
- Partition Key
- 파티션 키는 스트림 내에서 샤드별로 데이터를 그룹화하는데에 사용된다.
- 데이터 레코드에 파티션 키를 지정할 수 있다.
- 파티션 키로 데이터 레코드가 속할 샤드를 결정할 수 있으며 결정시에는 MD5 Hash 함수를 사용한다.
- Sequence Number
- 각각의 데이터 레코드에는 파티션 키에 고유한 Sequence Number가 있다.
- Application Name
- 애플리케이션의 식별자
- DynamoDB, Amazon CloudWatch의 이름으로 사용된다.
Get Started: 데이터 스트림 생성하고 레코드 생산하고 소비하기
데이터 스트림 생성하기
Kinesis 콘솔에 접속 후 데이터 스트림 생성 버튼을 클릭 한 후 데이터 스트림 이름을 입력하고 샤드 수를 1개로 둔 뒤 생성 버튼을 클릭한다.
Kinesis Agent로 스트림에 보내기 (Docker)
- Kinesis Agent란?
- 파일 패턴을 모니터링하고 새로운 데이터 레코드를 스트림에 보낸다.
- 체크포인트를 확인하고, 파일이 변경되었는지 확인하고, 실패시 재시도한다.
- 모든 데이터를 안정적이고 시기 적절하며 간단한 방식으로 제공한다.
- Cloudwatch Metrics에 매트릭을 전송한다.
- 아래의 코드로 Dockerfile을 만들어서 빌드한다.
docker build --tag producer-agent-sample .
FROM ubuntu:20.04 ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update RUN apt-get install openjdk-8-jdk -y RUN apt-get install python3 -y RUN apt-get install python3-pip -y RUN apt-get install git -y RUN apt-get install vim -y RUN update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java WORKDIR /root RUN git clone https://github.com/awslabs/amazon-kinesis-agent WORKDIR /root/amazon-kinesis-agent RUN ./setup --install
- app/config/agent.json 파일을 아래와 같이 생성한다
- /app/logs/*.log 경로에 있는 파일을 보면서 새롭게 업데이트되면 kinesis로 보내겠다는 뜻이다.
-
{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "kinesis.ap-northeast-1.amazonaws.com", "firehose.endpoint": "", "flows": [ { "filePattern": "/app/logs/*.log", "kinesisStream": "sample", "partitionKeyOption": "RANDOM" } ] }
- https://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html
- app/config/initalize.sh 파일을 아래와 같이 생성한다.
cp /app/config/agent.json /etc/aws-kinesis/agent.json service aws-kinesis-agent start tail -f /dev/null
- 도커 실행하기
docker run -it --rm -d --name producer-agent-sample -v $PWD/app:/app producer-agent-sample bash /app/initalize.sh
- 아래의 코드를 실행시켜 임시 로그 생성하기
import time for i in range(100): message = f'Hi, {i}\n' with open('app/logs/hello.log', 'a') as fp: fp.write(message) print(message) time.sleep(10)
- 아래의 코드로 키네시스 로그를 받아서 출력해본다.
import boto3 import time kinesis_client = boto3.client('kinesis', region_name='ap-northeast-1') shard_id_list = [shard['ShardId'] for shard in kinesis_client.describe_stream(StreamName='sample')['StreamDescription']['Shards']] first_shard_id = shard_id_list[0] shard_iterator = kinesis_client.get_shard_iterator(StreamName='sample', ShardId=first_shard_id, ShardIteratorType='LATEST')['ShardIterator'] while True: out = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=2) print('. ', end='') if len(out['Records']): print() print(out['Records']) shard_iterator = out['NextShardIterator'] time.sleep(5)
- 새로운 메세지가 파일에 쓰이면 아래와 같은 메세지를 받아볼 수 있다.
[{'SequenceNumber': '49619037467868722068071923439356131850930233483901009922', 'ApproximateArrivalTimestamp': datetime.datetime(2021, 6, 9, 7, 3, 8, 509000, tzinfo=tzlocal()), 'Data': b'Hi, 17\n', 'PartitionKey': '881348.8485124188'}]
KPL로 스트림에 보내기
KPL은 Kinesis Producer Library의 약자이다.
- EC2 인스턴스가 수천개의 디바이스에서 이벤트를 수집한다고 했을 때 필요한 처리량을 달성하기 위해서 멀티 스레딩과 같은 복잡한 작업을 해야하는데 이 작업을 해줄 수 있다.
- KCL을 사용하는 경우 KPL은 쉽게 통합될 수 있다.
- Cloudwatch에서 쉽게 모니터링 및 분석을 할 수 있다.
- 비동기로 동작한다.
- Java KPL Example
KinesisProducer kinesis = new KinesisProducer(); for (int i = 0; i < 200; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); kinesis.addUserRecord("myStream", "myPartitionKey", data); }
Boto3를 사용하여 스트림에 보내기 (put_record, put_records)
Kinesis - Boto3 Docs 1.17.91 documentation
추가
아래의 명령어로 키네시스 로그를 확인해 볼 수 있다.
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
Kinesis Auto Scaling
아래의 라이브러리를 사용하면 EC2 Auto Scaling과 동일한 방식으로 샤드 수를 줄이고 늘릴 수 있다.
awslabs/amazon-kinesis-scaling-utils
Kinesis vs Kafk
키네시스와 카프카는 아래와 같이 비슷한 점이 아주 많다.
Concepts | Kafka | Kinesis |
스토리지 | 파티션 | 샤드 |
보관 기간 | 설정 가능 | 1일 ~ 7일 (기본값 1일) |
데이터 크기 | 설정 가능(기본값 1 MB) | 최대 1MB |
파티션 / 샤드 생성 제한 | 제한 없음 | 200개 가능(ap-northeast 기준) |
복제 | 클러스터 안에서 복제 | 3개 지역에 자동 복제 |
메세지 전달 횟수 | Kafka: 적어도 한 번 전송, Kafka Stremas: 정확히 1회 전송 | 적어도 1회 전송 |
의존성 | Zookepper | DynamoDB |
관리 | 많은 관리 필요 | AWS에서 자동 관리 |
반응형
'AWS' 카테고리의 다른 글
[⚡AWS] Redshift 상관관계가 있는 서브 쿼리 오류 (0) | 2021.05.27 |
---|---|
[⚡AWS] Redshift에서 View 사용하기 (0) | 2021.05.25 |
[⚡AWS] EC2 스팟 인스턴스 간단한 개념 (0) | 2021.04.30 |
Comments