[아키텍쳐] 월배치 Lambda 함수 자동화 - 이메일 알람까지
개요
- 매월 1일 오전 1시에 각 빌딩별 분석 배치 실행
흐름도
-
EventBridge : cron job으로 매월 1일에 오전 1시에 배치작업을 트리거함
- stat-prep : 마스터 테이블 동기화.
- 빌딩 목록 최신화하여 저장(stat_dimension) : 새로 추가된 빌딩이 있는 경우 빌딩 마스터 정보 저장
- 통계 주기 최신화하여 저장(stat_period)
- 해당 작업 완료 시, 분석 코드 병렬 invoke
- stat-analysis: NLP 분석하여 건물별 분석 결과 DB 저장
- java 환경 설정
- 전달의 1일~말일로 필터링 날짜 설정
- 분석대상 데이터 가져오기
- NLP 분석 및 분석 결과 적재
- 이 외 자원
- psycopg2-layer, konlpy-layer : 파이썬 라이브러리 계층
- java_package : java JVM 환경 설정
- 분석 코드랑 분석 환경은 다음에 다뤄보기로 하고 자주 쓰는 데이터 파이프라인 설정을 정리하고자 한다.
배치 스케줄 작업 (EventTrigger)
- EventTrigger를 생성하여 Lambda Event에 추가해주면 된다.
-
EventTrigger Cron식
0 16 L * ? *
- 매월 마지막 날, UTC 기준 오후 4시 0분에 실행 = KST 기준 다음 날(매월 1일) 새벽 1시 0분에 실행
- AWS EventTrigger 서버 시간은 항상 UTC이므로 KST에 맞춰 표시하는 것이 중요하다. 이는 Lambda 내에서 시간 관련 작업을 할 때도 마찬가지.
Lambda 함수에서 다른 Lambda 함수 실행하기
- stat-prep함수의 작업이 끝난 후 분석 모듈인 stat-analysis를 실행하도록 한다.
-
stat-prep Lambda 함수 코드
from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta from zoneinfo import ZoneInfo # KST 변환용 import os import psycopg2 import boto3 import json def lambda_handler(event, context): # Lambda 클라이언트 생성 lambda_client = boto3.client('lambda') try: # 1. building 테이블 변경사항을 stat_dimension에 저장 insert_stat_dimension(cur) # 2. 직전 달의 기간을 stat_period에 저장 insert_stat_period(cur) conn.commit() # 3. 분석 함수 invoke event_data = {"message": "Test message from prep"} invoke_lambda(lambda_client, 'stat-analysis-01', event_data) invoke_lambda(lambda_client, 'stat-analysis-02', event_data) except Exception as e: conn.rollback() print("Error:", e) finally: cur.close() conn.close() def insert_stat_dimension(cur): ## 중략 def insert_stat_period(cur): # UTC -> KST 변환 utc_now = datetime.utcnow() today = utc_now.replace(tzinfo=ZoneInfo("UTC")).astimezone(ZoneInfo("Asia/Seoul")) ## 중략 def invoke_lambda(lambda_client, function_name, event): try: response = lambda_client.invoke( FunctionName=function_name, InvocationType='Event', # 테스트시에는 'RequestResponse'로 테스트 Payload=json.dumps(event) ) print(f"Invoked {function_name} with response: {response}") except Exception as e: print(f"Error invoking {function_name}: {e}")
분석 후 이메일 알람 보내기(SNS)
- Lambda에서 에러가 발생 하거나 혹은 알람을 보내고 싶을 때 SNS(Simple Notification Service)를 연동하면 된다.
- 선제조건 : SNS 구독 등록
-
Lambda 함수 코드
import json import boto3 import psycopg2 import csv import os from io import StringIO from datetime import datetime, timedelta, timezone KST = timezone(timedelta(hours=9)) S3_BUCKET = os.environ['S3_BUCKET'] SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN'] # SNS 주제 ARN def send_sns_message(message): sns_client = boto3.client('sns') response = sns_client.publish( TopicArn=SNS_TOPIC_ARN, Message=message ) return response def lambda_handler(event, context): s3 = boto3.client('s3') ## 중략 : 분석 코드 ############## output_file_key = f'stat/{start_date}-{end_date}.csv' s3.put_object(Bucket=S3_BUCKET, Key=output_file_key, Body=csv_buffer.getvalue().encode('utf-8-sig')) # S3 URL 생성 (콘솔 로그인 후 접근 가능) s3_url = f"https://{S3_BUCKET}.s3.amazonaws.com/{output_file_key}" message = f"분석 리포트가 업로드되었습니다. 링크 클릭 시 다운로드: {s3_url}" # SNS로 메시지 전송 send_sns_message(message) return { 'statusCode': 200, 'body': json.dumps('Success') }