데이터 엔지니어 기술 블로그

[⚡AWS] 키네시스(Kinesis): 실시간 데이터 스트림 본문

AWS

[⚡AWS] 키네시스(Kinesis): 실시간 데이터 스트림

jun_yeong_park 2021. 6. 11. 16:29
반응형

Kinesis란?

Kinesis는 실시간으로 데이터 스트림을 수집, 처리, 분석해주는 서비스이다.

 

Data Streams

Kinesis Data Streams

  • 데이터 스트림 수집 및 저장
  • 샤드의 수를 조절하여 스트림을 얼마나 받을지 조절할 수 있다.

 

 

Data Firehose

데이터 스트림 처리 및 전송

Data Firehose

Data Analytics

  • 스트리밍 데이터 분석
  • 실시간 분석 생성 – 지표를 계산하고, Kinesis를 통해 Amazon S3 또는 Amazon Redshift로 전송할 수 있다.
  • 실시간 대시보드 제공 – 집계 및 처리된 스트리밍 데이터 결과를 전송하여 실시간 대시보드를 구성할 수 있다.
  • 실시간 지표 생성 – 실시간 모니터링, 알림, 경보에 사용할 사용자 지정 지표와 트리거를 생성할 수 있다.

Video Streams

  • 재생 및 분석을 위해 미디어 스트림을 캡처, 저장 및 처리

 

Data Streams

용어 정리

  • Data Record
    • 데이터 레코드는 데이터 스트림에서 사용되는 메세지를 말한다.
    • sequence number, partition key, data blob 등으로 이루어져 있으며 하나에 1MB 까지 사용할 수 있다.
    • 한 번 스트림에 들어가면 변경이 불가능하다.
  • Retention Period
    • 데이터 레코드의 보존 기간은 기본적으로 24시간이다.
    • 최소 24시간이며 최대 365일까지 증가시킬 수 있으나 요금이 추가된다.
  • Producer
    • 스트림에 데이터를 보내준다.
  • Consumer
    • 스트림에 있는 데이터를 가져와서 사용한다.
  • Shard
    • 샤드는 데이터 스트림의 단위를 말한다. 하나의 스트림은 하나 또는 그 이상의 샤드로 구성될 수 있다.
    • 하나의 샤드는 읽을 때 최대 5개의 트랜잭션을 지원하며 최대 초당 2MB를 읽을 수 있고, 초당 1000개의 레코드를 쓰고 초당 1MB를 쓸 수 있다.
  • Partition Key
    • 파티션 키는 스트림 내에서 샤드별로 데이터를 그룹화하는데에 사용된다.
    • 데이터 레코드에 파티션 키를 지정할 수 있다.
    • 파티션 키로 데이터 레코드가 속할 샤드를 결정할 수 있으며 결정시에는 MD5 Hash 함수를 사용한다.
  • Sequence Number
    • 각각의 데이터 레코드에는 파티션 키에 고유한 Sequence Number가 있다.
  • Application Name
    • 애플리케이션의 식별자
    • DynamoDB, Amazon CloudWatch의 이름으로 사용된다.

 

 

Get Started: 데이터 스트림 생성하고 레코드 생산하고 소비하기

데이터 스트림 생성하기

Kinesis 콘솔에 접속 후 데이터 스트림 생성 버튼을 클릭 한 후 데이터 스트림 이름을 입력하고 샤드 수를 1개로 둔 뒤 생성 버튼을 클릭한다.

 

Kinesis 데이터 스트림 생성하기
Kinesis Data Stream 콘솔의 스트림 정보에 보면 소비자와 생산자가 있다. Data Firehose를 통해 외부로 내보낼 수도 있고, 클라이언트 라이브러리로 개발하여 내보낼 수도 있다.

 

Kinesis Agent로 스트림에 보내기 (Docker)

  1. Kinesis Agent란?
    • 파일 패턴을 모니터링하고 새로운 데이터 레코드를 스트림에 보낸다.
    • 체크포인트를 확인하고, 파일이 변경되었는지 확인하고, 실패시 재시도한다.
    • 모든 데이터를 안정적이고 시기 적절하며 간단한 방식으로 제공한다.
    • Cloudwatch Metrics에 매트릭을 전송한다.
  2. 아래의 코드로 Dockerfile을 만들어서 빌드한다.
    docker build --tag producer-agent-sample .
    FROM ubuntu:20.04
    
    ENV DEBIAN_FRONTEND=noninteractive
    
    RUN apt-get update
    RUN apt-get install openjdk-8-jdk -y
    RUN apt-get install python3 -y
    RUN apt-get install python3-pip -y
    RUN apt-get install git -y
    RUN apt-get install vim -y
    RUN update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
    WORKDIR /root
    RUN git clone https://github.com/awslabs/amazon-kinesis-agent
    WORKDIR /root/amazon-kinesis-agent
    RUN ./setup --install
  3. app/config/agent.json 파일을 아래와 같이 생성한다
    1. /app/logs/*.log 경로에 있는 파일을 보면서 새롭게 업데이트되면 kinesis로 보내겠다는 뜻이다.
    2. {
        "cloudwatch.emitMetrics": true,
        "kinesis.endpoint": "kinesis.ap-northeast-1.amazonaws.com",
        "firehose.endpoint": "",
        "flows": [
          {
            "filePattern": "/app/logs/*.log",
            "kinesisStream": "sample",
            "partitionKeyOption": "RANDOM"
          }
        ]
      }
    3. https://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html
  4. app/config/initalize.sh 파일을 아래와 같이 생성한다.
    cp /app/config/agent.json /etc/aws-kinesis/agent.json
    service aws-kinesis-agent start
    tail -f /dev/null​
  5. 도커 실행하기
    docker run -it --rm -d --name producer-agent-sample -v $PWD/app:/app producer-agent-sample bash /app/initalize.sh​
  6. 아래의 코드를 실행시켜 임시 로그 생성하기
    import time
    for i in range(100):
        message = f'Hi, {i}\n'
        with open('app/logs/hello.log', 'a') as fp:
            fp.write(message)
        print(message)
        time.sleep(10)
  7. 아래의 코드로 키네시스 로그를 받아서 출력해본다.
    import boto3
    import time
    
    kinesis_client = boto3.client('kinesis', region_name='ap-northeast-1')
    
    shard_id_list = [shard['ShardId'] for shard in kinesis_client.describe_stream(StreamName='sample')['StreamDescription']['Shards']]
    first_shard_id = shard_id_list[0]
    
    
    shard_iterator = kinesis_client.get_shard_iterator(StreamName='sample', ShardId=first_shard_id, ShardIteratorType='LATEST')['ShardIterator']
    while True:
        out = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=2)
        print('. ', end='')
        if len(out['Records']):
            print()
            print(out['Records'])
        shard_iterator = out['NextShardIterator']
        time.sleep(5)​
  8. 새로운 메세지가 파일에 쓰이면 아래와 같은 메세지를 받아볼 수 있다.
    [{'SequenceNumber': '49619037467868722068071923439356131850930233483901009922',
      'ApproximateArrivalTimestamp': datetime.datetime(2021, 6, 9, 7, 3, 8, 509000, tzinfo=tzlocal()),
      'Data': b'Hi, 17\n',
      'PartitionKey': '881348.8485124188'}]​

 

 

KPL로 스트림에 보내기

KPL은 Kinesis Producer Library의 약자이다.

  • EC2 인스턴스가 수천개의 디바이스에서 이벤트를 수집한다고 했을 때 필요한 처리량을 달성하기 위해서 멀티 스레딩과 같은 복잡한 작업을 해야하는데 이 작업을 해줄 수 있다.
  • KCL을 사용하는 경우 KPL은 쉽게 통합될 수 있다.
  • Cloudwatch에서 쉽게 모니터링 및 분석을 할 수 있다.
  • 비동기로 동작한다.
  • Java KPL Example
    KinesisProducer kinesis = new KinesisProducer();  
    for (int i = 0; i < 200; ++i) {
        ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
        kinesis.addUserRecord("myStream", "myPartitionKey", data); 
    }

 

 

Boto3를 사용하여 스트림에 보내기 (put_record, put_records)

Kinesis - Boto3 Docs 1.17.91 documentation

 

Kinesis — Boto3 Docs 1.17.92 documentation

The response of this operation contains an EventStream member. When iterated the EventStream will yield events based on the structure below, where only one of the top level keys will be present for any given event. Response Syntax { 'EventStream': EventStr

boto3.amazonaws.com

 

추가

아래의 명령어로 키네시스 로그를 확인해 볼 수 있다.

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

 

Kinesis Auto Scaling

아래의 라이브러리를 사용하면 EC2 Auto Scaling과 동일한 방식으로 샤드 수를 줄이고 늘릴 수 있다.

awslabs/amazon-kinesis-scaling-utils

 

 

Kinesis vs Kafk

키네시스와 카프카는 아래와 같이 비슷한 점이 아주 많다.

Concepts Kafka Kinesis
스토리지 파티션 샤드
보관 기간 설정 가능 1일 ~ 7일 (기본값 1일)
데이터 크기 설정 가능(기본값 1 MB) 최대 1MB
파티션 / 샤드 생성 제한 제한 없음 200개 가능(ap-northeast 기준)
복제 클러스터 안에서 복제 3개 지역에 자동 복제
메세지 전달 횟수 Kafka: 적어도 한 번 전송, Kafka Stremas: 정확히 1회 전송 적어도 1회 전송
의존성 Zookepper DynamoDB
관리 많은 관리 필요 AWS에서 자동 관리

 

반응형
Comments