본문 바로가기

꼰대개발자/프로그래밍 언어

AWS WAF 데이터를 DB에 저장하기

WAF의 데이터를 저장하는 여러 방법중 S3가 제일 요금이 적게 나오는 방법이고 데이터를 다운로드 받아서 처리하기에 편리합니다. AWS의 요금체계를 파악하지 못해서 사용하면 요금폭탄을 맞게 됩니다. 항상 주의해야 해요.

 

1. AWS WAF S3 로그 실제 포맷

{
  "timestamp": 1779920495000,
  "formatVersion": 1,
  "webaclId": "arn:aws:wafv2:ap-northeast-2:12345:regional/webacl/alb-waf/a1b2c3d4-...",
  "terminatingRuleId": "Block_Bad_Bots",
  "terminatingRuleType": "REGULAR",
  "action": "BLOCK",
  "terminatingRuleMatchDetails": [],
  "httpLogVersion": "2.0",
  "httpSourceId": "12345-app/alb/...",
  "httpSourceType": "ALB",
  "httpRequest": {
    "clientIp": "57.141.14.41",
    "country": "US",
    "headers": [
      { "name": "Host", "value": "www.bogus.com" },
      { "name": "User-Agent", "value": "meta-externalagent/1.1 (+https://developers.facebook.com/docs/sharing/webmasters/crawler)" },
      { "name": "Accept", "value": "*/*" }
    ],
    "uri": "/view/220992/1104",
    "args": "",
    "httpMethod": "GET",
    "httpVersion": "HTTP/1.1"
  },
  "labels": []
}

 

2. MySQL 테이블 생성

CREATE TABLE `waf_logs` (
	`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '고유 번호',
	`log_type` ENUM('BLOCK','COUNT') NOT NULL COMMENT '액션 유형' COLLATE 'utf8mb4_general_ci',
	`client_ip` VARCHAR(45) NOT NULL COMMENT '공격자 IPv4/IPv6' COLLATE 'utf8mb4_general_ci',
	`domain` VARCHAR(150) NULL DEFAULT NULL COMMENT '요청 도메인' COLLATE 'utf8mb4_general_ci',
	`uri` VARCHAR(2083) NOT NULL COMMENT '요청 주소' COLLATE 'utf8mb4_general_ci',
	`rule_id` VARCHAR(128) NULL DEFAULT NULL COMMENT '매칭된 WAF 규칙 ID' COLLATE 'utf8mb4_general_ci',
	`labels` VARCHAR(500) NULL DEFAULT NULL COMMENT '진단한 공격 유형' COLLATE 'utf8mb4_general_ci',
	`ua` VARCHAR(500) NULL DEFAULT NULL COMMENT '브라우저 및 봇 정보 (User-Agent)' COLLATE 'utf8mb4_general_ci',
	`waf_timestamp` BIGINT(20) UNSIGNED NOT NULL COMMENT 'AWS WAF 발생 시각 (밀리초 Epoch)',
	`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '서버 DB 인서트 시각',
	PRIMARY KEY (`id`) USING BTREE,
	INDEX `idx_log_type_created` (`log_type`, `created_at`) USING BTREE,
	INDEX `idx_client_ip` (`client_ip`) USING BTREE,
	INDEX `idx_waf_timestamp` (`waf_timestamp`) USING BTREE
)
COMMENT='AWS WAF 차단 및 카운트 로그 수집 테이블'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB;

 

3. 데이터 파싱 후 처리

<?php 
// PHP 5.2에서 실행하는 코드임
$target_dir = "/home/waf/data";
$ymdh = date('Y/m/d/H', strtotime("10 hour ago")); // 표준시간 9시간을 빼고 1시간전 = 10시간전
$exec = "/usr/bin/aws s3 cp s3://aws-waf-logs/ap-northeast-2/$ymdh/ $target_dir --recursive --exclude \"*\" --include \"*.gz\"";
$output = shell_exec($exec);

echo "$output \n";

// --- [추가] WAF 로그 재귀 파싱 및 디스크 정리 로직 ---

// 통계용 변수
$file_count = 0;
$block_count = 0;


$hostname = "localhost";
$username = "root";
$userpass = "xxxxxxxxx";
$dbname = "waf";
$dbconn = mysql_connect($hostname, $username, $userpass);
mysql_select_db($dbname);

// PHP 5.2 환경에서 하위 폴더까지 샅샅이 뒤지는 재귀 함수 호출
parse_waf_directory($target_dir, $file_count, $block_count);

mysql_close($dbconn);

echo "\n========================================\n";
echo "[파싱 완료 리포트]\n";
echo "처리한 총 .gz 파일 수: {$file_count} 개\n";
echo "차단(BLOCK)된 누적 건수: {$block_count} 건\n";
echo "========================================\n";


/**
 * 디렉토리를 재귀적으로 탐색하며 .gz 파일을 파싱하고 삭제하는 함수 (PHP 5.2 호환)
 */
function parse_waf_directory($dir, &$file_count, &$block_count) {
	global $dbconn;
	
	if (!is_dir($dir) || !($dh = opendir($dir))) {
		return;
	}
	
	while (($file = readdir($dh)) !== false) {
		// 리눅스 숨김 디렉토리(., ..)는 패스
		if ($file == '.' || $file == '..') {
			continue;
		}
		
		$full_path = $dir . '/' . $file;
		
		// 1. 만약 하위 디렉토리(예: 00, 05, 10 등)라면 함수를 다시 스스로 호출(재귀)
		if (is_dir($full_path)) {
			parse_waf_directory($full_path, $file_count, $block_count);
			
			// [용량 관리] 하위 폴더 내부 파일이 다 삭제되어 빈 폴더가 되었다면 폴더 자체도 삭제
			@rmdir($full_path);
		}
		// 2. 파일이면서 확장자가 .gz 인 경우 파싱 시작
		else if (is_file($full_path) && pathinfo($full_path, PATHINFO_EXTENSION) === 'gz') {
			$file_count++;
			
			// gzopen으로 압축 파일 직접 열기
			$zp = @gzopen($full_path, "r");
			if ($zp) {
				while (($line = gzgets($zp)) !== false) {
					// WAF 로그는 한 줄에 하나씩 JSON 객체로 이루어져 있습니다.
					$log_data = json_decode($line, true);
					
					if (!$log_data) continue;
					
					$action = $log_data['action']; // ALLOW, BLOCK 등 기본 액션
					$is_collected = false;        // 이번 로그를 DB에 저장할지 여부 플래그
					$log_type = '';               // 'BLOCK' 또는 'COUNT' 구분용 변수
					
					// 2-1. 최전방에서 완전히 차단(BLOCK)된 경우 처리
					if ($action === 'BLOCK') {
						$is_collected = true;
						$log_type = 'BLOCK';
						$block_count++; // 기존 BLOCK 카운터 증가
					}
					// 2-2. 차단되지는 않았지만 COUNT 규칙에 걸려 통과한 경우 처리
					else if (isset($log_data['nonTerminatingMatchingRules']) && !empty($log_data['nonTerminatingMatchingRules'])) {
						// COUNT 규칙 배열을 돌면서 실제로 어떤 규칙에 걸렸는지 확인
						foreach ($log_data['nonTerminatingMatchingRules'] as $rule) {
							if ($rule['action'] === 'COUNT') {
								$is_collected = true;
								$log_type = 'COUNT';
								
								// 어떤 COUNT 규칙에 걸렸는지 규칙 ID를 박아두면 추후 분석에 아주 좋습니다.
								$triggered_rule_id = $rule['ruleId'];
								break;
							}
						}
					}
					
					// 2-3. 수집 대상(BLOCK 또는 COUNT)인 경우에만 공통 데이터 추출 및 DB 인서트
					if ($is_collected) {
						$client_ip  = $log_data['httpRequest']['clientIp'];
						$uri        = $log_data['httpRequest']['uri'];
						$timestamp  = $log_data['timestamp'];
						
						// HTTP 헤더에서 User-Agent와 Host(도메인) 추출
						$user_agent = '-';
						$host_domain = '-'; // 도메인을 저장할 변수 추가
						
						if (isset($log_data['httpRequest']['headers'])) {
							foreach ($log_data['httpRequest']['headers'] as $header) {
								$header_name = strtolower($header['name']);
								
								// 1. User-Agent 추출
								if ($header_name === 'user-agent') {
									$user_agent = $header['value'];
								}
								// 2. 접속 도메인(Host) 추출 추가된 로직
								else if ($header_name === 'host') {
									$host_domain = $header['value']; 
								}
							}
						}
						
						// WAF 로그 파싱 while 루프 내부 예시
						$labels_array = array();
						if (isset($log_data['labels']) && is_array($log_data['labels'])) {
							foreach ($log_data['labels'] as $lbl) {
								// "awswaf:managed:aws:core-rule-set:" 같은 긴 접두사를 떼고 핵심만 저장해도 좋습니다.
								$labels_array[] = $lbl['name'];
							}
						}
						
						// 배열을 "XSS, SQLi" 형태의 콤마 분리 문자열로 변환
						$labels_string = implode(',', $labels_array);
						$labels = mysql_real_escape_string($labels_string);
						
						// DB 저장을 위한 이스케이프 처리
						$log_type    = mysql_real_escape_string($log_type);
						$client_ip   = mysql_real_escape_string($client_ip);
						$uri         = mysql_real_escape_string($uri);
						$rule_id     = mysql_real_escape_string(isset($triggered_rule_id) ? $triggered_rule_id : '');
						$ua          = mysql_real_escape_string($user_agent);
						$domain      = mysql_real_escape_string($host_domain); // 변수 바인딩
						$waf_timestamp = (float)$timestamp;
						
						// 2-4. 테이블 컬럼 매핑 구조에 맞춘 INSERT 쿼리문 작성
						// created_at은 DB에서 DEFAULT CURRENT_TIMESTAMP로 자동 입력되므로 쿼리에서 제외하여 성능을 높였습니다.
						$query = "INSERT INTO `waf_logs` (
							`log_type`,
							`client_ip`,
							`domain`,
							`uri`,
							`rule_id`,
							`labels`,
							`ua`,
							`waf_timestamp`
			              ) VALUES (
							'{$log_type}',
							'{$client_ip}',
							'{$domain}',
							'{$uri}',
							" . ($rule_id ? "'{$rule_id}'" : "NULL") . ",
							'{$labels}',
							'{$ua}',
							{$waf_timestamp}
			              )";
			                
						// 2-5. 쿼리 실행
						$result = mysql_query($query);
						
						// 테스트 확인용 출력 (필요 없으면 주석 처리)
						echo "[{$log_type}] IP: {$client_ip} -> URI: {$domain} {$uri} \n";
			                
						if (!$result) {
			                // 서버 에러 로그에 쿼리 실패 기록 (디버깅용, 서비스 중단 방지용 @ 처리)
							@error_log("WAF 로그 DB 인서트 실패: " . mysql_error());
						}
					}
				}
				gzclose($zp);
			}
			
			// [핵심] 파싱이 끝난 낱개 .gz 파일은 즉시 삭제하여 서버 디스크 용량 방어!
			@unlink($full_path);
		}
	}
	closedir($dh);
}
?>

 

AWS WAF는 강력하다!

반응형