Browsing Tag

security

NCLOUD

[NCLOUD] API를 활용한 DoS 공격 차단 시스템 구현

안녕하세요 MANVSCLOUD 김수현입니다.

IT 보안은 기술이 발전할수록 더욱 중요해지는 요소입니다.
최근 특정 해외 IP 하나가 웹을 통해 초당 수십번의 연결로 서버 내 디스크 용량이 가득차는 사례를 겪었습니다. 이러한 공격은 자원 고갈을 노린 DoS 공격 유형 중 하나인데요.

SQL 인젝션, XSS, CSRF 등 공격을 방어하는 데 유용한 WAF 서비스로는 효과적이지 못하여 서비스 다운으로 이어질 수 밖에 없었습니다.

네이버 클라우드 플랫폼의 Security Monitoring 서비스에는 WAF 상품 외에도 Anti-DDoS, IPS 등 다양한 서비스가 존재하지만 보안에 큰 비용을 투자할 수 없는 기업이나 개인 테스트용으로 사용하는 경우에는 비용에 대한 부담이 생길 수 밖에 없습니다.

이러한 문제로 오늘은 네이버 클라우드에서 제공하고 있는 다양한 서비스의 API 들을 이용하여 간단하게 웹으로부터 들어오는 DoS 공격 차단 시스템을 만들어볼 것입니다.

DoS 공격을 전문적으로 차단하기 위한 서비스보다는 완벽할 순 없겠지만 간단하지만 어느정도 만족스러울 수 있는 시스템을 구현해봅시다.


Idea

생각은 그리 오래하지 않았고 그냥 manvscloud.com 환경을 그대로 이용하여 일단 시작해보기로 했습니다.

아래 이미지는 생각한 DoS 공격 차단 시스템이 동작하는 방식입니다.

웹 서버에 연결된 ACG에서 HTTP 접속은 Application Load Balancer의 Subnet 대역에서만 접근할 수 있도록 되어있기때문에 직접적으로 Web 서버로 연결되는 것은 불가능한 상태입니다.

결국 웹 접근은 Application Load Balancer를 통해서만 가능하기때문에 ALB의 Subnet에 연결된 NACL(Network Access Control List)에 정책을 추가/제거하는 방법을 선택했습니다.

여기서 중요한 점은 NACL에는 정책이 200개까지만 추가 가능하기때문에 여러개의 ALB 또는 Subnet이 하나의 NACL를 함께 쓰는 경우에는 문제가 될 수 있으므로 하나의 ALB별로 각각의 Subnet을 갖도록하고 해당 Subnet용 NACL이 연결되도록 해야합니다.

하나의 IP가 리소스 고갈을 목적 웹 접근을 했다고 가정하고 시나리오를 작성해보겠습니다.

1) 접속자가 Application Load Balancer를 통해 접근

2) Application Load Balancer의 액세스 로그를 Cloud Log Analytics에서 수집

3) DoS 공격 차단 시스템이 구현된 서버에서 Cloud Log Analytics의 Application Load Balancer 로그를 분석

4) 예외 IP를 제외한 나머지 IP가 x분당 x회 이상 접근 시 해당 IP는 NACL에 차단 정책 추가

5) 차단된 IP로는 웹 접근 불가

6) 정해진 특정 시간이 지난 IP는 NACL에서 차단 정책 제거

이제 위 시나리오대로 동작할 수 있도록 DoS 공격 차단 시스템을 구현해보도록 합시다.
(참고로 Cloud Log Analytic를 사용하고 싶지 않다 or ALB 액세스 로그 수집을 하고 싶지 않다면 WEB 서버에서 x-forwarded-for 설정을 해서 해당 환경에 맞게 구현할 수도 있습니다.)


DoS Blocker

이 시스템 이름은 DoS Blocker라고 정했습니다.
Python을 사용하여 만들었고 총 3개의 파일만 생성하면 됩니다.

– nacl_auto_class.py
– nacl_auto_block.py
– nacl_auto_remove.py

# VPC, Server 생성에 대한 가이드는 생략합니다.
# Python은 3.7 버전이 사용되었습니다.

# 해당 과정에서는 편의상 환경 변수에 ACCESS KEY 추가한 점 참고 부탁드립니다.

mkdir -p dos-blocker/manvscloud-alb
mkdir /var/log/nacl
cd dos-blocker/manvscloud-alb

먼저 위와 같이 디렉토리 생성 및 이동을 해주세요.


  • nacl_auto_class.py

이 파일은 간단한 Class 파일입니다.
코드의 중복을 줄이고 코드가 깔끔해질 수 있도록 아래와 같이 생성하였습니다.

import os
import base64
import hmac
import hashlib
import requests
import time
import json

class NACLAutomation:
    def __init__(self, nacl_no, region, drop_port, drop_proto, api_server_ncloud, log_filename=None, keyword=None):
        self.nacl_no = nacl_no
        self.region = region
        self.drop_port = drop_port
        self.drop_proto = drop_proto
        self.api_server_ncloud = api_server_ncloud
        self.log_filename = log_filename
        self.keyword = keyword

    def get_access_headers(self, timestamp, method, uri):
        access_key = os.environ['NCLOUD_ACCESS_KEY']
        secret_key = os.environ['NCLOUD_SECRET_KEY']
        secret_key = bytes(secret_key, 'UTF-8')
        message = method + " " + uri + "\n" + timestamp + "\n" + access_key
        message = bytes(message, 'UTF-8')
        signingKey = base64.b64encode(hmac.new(secret_key, message, digestmod=hashlib.sha256).digest())
        http_header = {
            'x-ncp-apigw-signature-v2': signingKey,
            'x-ncp-apigw-timestamp': timestamp,
            'x-ncp-iam-access-key': access_key
        }
        return http_header

    def make_request(self, method, uri, payload=None, api_server=None):
        timestamp = str(int(time.time() * 1000))
        http_header = self.get_access_headers(timestamp, method, uri)

        api_server = api_server or self.api_server_ncloud

        if method == "POST":
            response = requests.post(api_server + uri, headers=http_header, json=payload)
        else:
            response = requests.get(api_server + uri, headers=http_header)
        return response.text

    def get_nacl_rules(self):
        uri = "/vpc/v2/getNetworkAclRuleList?responseFormatType=json&regionCode=" + self.region + "&networkAclNo=" + self.nacl_no
        response = self.make_request("GET", uri)
        return json.loads(response)['getNetworkAclRuleListResponse']['networkAclRuleList']

  • nacl_auto_block.py

1) exclude_ips = [“1.1.1.1/32”] 부분에서 Block 되지 않길 바라는 IP를 추가할 경우 해당 IP는 NACL에 차단 등록이 되지 않습니다.

2) payload에서 logTypes, timeZone, timestampFrom, interval, pageSize 수정이 필요할 경우 수정해주세요.

3) block_ips = [f”{ip}/32″ for ip, count in ip_counter.items() if count > 300] 부분에서 300 부분은 위 payload에서 설정한 시간 내에 300회 이상 Count 될 경우 차단이 되도록 해둔 것입니다. 원하는 Count 값이 있다면 300 부분을 다른 수치로 변경해주시면 됩니다.

4) 아래 코드에서 nacl_no=”1111″의 1111 부분과 log_filename=”/var/log/nacl/drop-1111.log”의 1111 부분은 ALB가 있는 서브넷의 NACL No로 변경해주세요.
(Cloud Log Analytics에 검색 시 사용되는 keyword도 따로 설정해줄 수 있어요.)

if name == “main“:
nacl_block = NACLAutoBlock(nacl_no=”1111″,
region=”KR”,
drop_port=”1-65535″,
drop_proto=”TCP”,
api_server_ncloud=”https://ncloud.apigw.ntruss.com”,
api_server_cloud_log_analytics=”https://cloudloganalytics.apigw.ntruss.com”,
log_filename=”/var/log/nacl/drop-1111.log”,
keyword=””)

from nacl_auto_class import NACLAutomation
import logging
import datetime
import time
import json
from collections import Counter
import re

#Block 예외 처리(exclude_ips = ["1.2.3.4/32", "5.6.7.8/32"])
exclude_ips = ["1.1.1.1/32"]

class NACLAutoBlock(NACLAutomation):
    def __init__(self, nacl_no, region, drop_port, drop_proto, api_server_ncloud, api_server_cloud_log_analytics, log_filename, keyword):
        super().__init__(nacl_no, region, drop_port, drop_proto, api_server_ncloud, log_filename)
        self.api_server_cloud_log_analytics = api_server_cloud_log_analytics
        self.action_code = "DROP"

    def write_log(self, ip):
        logging.basicConfig(filename=self.log_filename, level=logging.INFO)
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        logging.info('[%s] %s DROP', current_time, ip)

    def extract_ips_to_block(self):
        payload = {
            "keyword" : self.keyword,
            "logTypes" : "application_loadbalancer_access",
            "timeZone": "+09:00",
            "timestampFrom": "now-1h",
            "timestampTo": "now",
            "interval": "5m",
            "pageNo": 1,
            "pageSize": 100
        }

        response = self.make_request("POST", "/api/v1/logs/search?responseFormatType=json", payload, self.api_server_cloud_log_analytics)

        data = json.loads(response)
        ip_counter = Counter()

        for result in data['result']['searchResult']:
            ip = re.search(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", result['logDetail'])
            if ip:
                ip_counter[ip.group()] += 1

        block_ips = [f"{ip}/32" for ip, count in ip_counter.items() if count > 300]
        return block_ips

    def get_min_priority(self):
        rules = self.get_nacl_rules()
        return min(rule['priority'] for rule in rules)

    def is_ip_exist(self, ip):
        rules = self.get_nacl_rules()
        return any(rule['ipBlock'] == ip for rule in rules)

    def add_nacl_rule(self, ip, priority):

        if ip in exclude_ips:
            return

        uri = f"/vpc/v2/addNetworkAclInboundRule?responseFormatType=json&regionCode={self.region}&networkAclNo={self.nacl_no}&networkAclRuleList.1.priority={priority}&networkAclRuleList.1.protocolTypeCode={self.drop_proto}&networkAclRuleList.1.ipBlock={ip}&networkAclRuleList.1.portRange={self.drop_port}&networkAclRuleList.1.ruleActionCode={self.action_code}"
        self.make_request("GET", uri)
        self.write_log(ip)

if __name__ == "__main__":
    nacl_block = NACLAutoBlock(nacl_no="1111",
                               region="KR",
                               drop_port="1-65535",
                               drop_proto="TCP",
                               api_server_ncloud="https://ncloud.apigw.ntruss.com",
                               api_server_cloud_log_analytics="https://cloudloganalytics.apigw.ntruss.com",
                               log_filename="/var/log/nacl/drop-1111.log",
                               keyword="")

    ips_to_block = nacl_block.extract_ips_to_block()
    min_priority = nacl_block.get_min_priority()

    for i, ip in enumerate(ips_to_block, start=1):
        if not nacl_block.is_ip_exist(ip):
            nacl_block.add_nacl_rule(ip, str(min_priority - i))
            time.sleep(15)

  • nacl_auto_remove.py

해당 코드는 log 파일을 검사하여 특정 시간이 지난 IP가 NACL에서 차단되어있다면 정책을 제거하는 코드입니다.

1) if (current_time – last_seen) > timedelta(days=1): 부분에서 days=1은 하루입니다. 하루가 지난 IP는 제거되게 됩니다. 이 부분을 변경할 수도 있습니다.

2) 아래 코드에서 nacl_no=”1111″의 1111 부분과 log_filename=”/var/log/nacl/drop-1111.log”의 1111 부분은 ALB가 있는 서브넷의 NACL No로 변경해주세요.

if name == “main“:
nacl_remove = NACLAutoRemove(nacl_no=”1111″,
region=”KR”,
drop_port=”1-65535″,
drop_proto=”TCP”,
api_server_ncloud=”https://ncloud.apigw.ntruss.com”,
log_filename=”/var/log/nacl/drop-1111.log”)

from nacl_auto_class import NACLAutomation
import re
from datetime import datetime, timedelta

class NACLAutoRemove(NACLAutomation):
    def __init__(self, nacl_no, region, drop_port, drop_proto, api_server_ncloud, log_filename):
        super().__init__(nacl_no, region, drop_port, drop_proto, api_server_ncloud, log_filename)
        self.action_code = "DROP"
        self.uri_remove = "/vpc/v2/removeNetworkAclInboundRule"

    def get_ip_list(self):
        with open(self.log_filename) as f:
            lines = f.readlines()
        current_time = datetime.now()
        ip_last_seen = {}
        ip_list = []

        for line in lines:
            log_time, ip = re.search(r'INFO:root:\[(.*?)\] (.*?)/32 DROP', line).groups()
            log_time = datetime.strptime(log_time, "%Y-%m-%d %H:%M:%S")
            ip_last_seen[ip] = max(log_time, ip_last_seen.get(ip, log_time))

        for ip, last_seen in ip_last_seen.items():
            if (current_time - last_seen) > timedelta(days=1):
                ip_list.append(ip)

        return ip_list


    def get_rule_priority(self, remove_ip):
        rules = self.get_nacl_rules()
        for rule in rules:
            if rule['ipBlock'] == remove_ip+"/32":
                return rule['priority']
        return None

    def remove_nacl(self, remove_ip, priority):
        uri = self.uri_remove
        uri = uri + "?responseFormatType=json&regionCode=" + self.region + "&networkAclNo=" + self.nacl_no + "&networkAclRuleList.1.priority=" + str(priority) + "&networkAclRuleList.1.protocolTypeCode=" + self.drop_proto + "&networkAclRuleList.1.ipBlock=" + remove_ip + "/32" + "&networkAclRuleList.1.portRange=" + self.drop_port + "&networkAclRuleList.1.ruleActionCode=" + self.action_code
        self.make_request("GET", uri)

if __name__ == "__main__":
    nacl_remove = NACLAutoRemove(nacl_no="1111",
                                 region="KR",
                                 drop_port="1-65535",
                                 drop_proto="TCP",
                                 api_server_ncloud="https://ncloud.apigw.ntruss.com",
                                 log_filename="/var/log/nacl/drop-1111.log")

    ip_list = nacl_remove.get_ip_list()
    for ip in ip_list:
        priority = nacl_remove.get_rule_priority(ip)
        if priority:
            nacl_remove.remove_nacl(ip, priority)

최종적으로 완성된 구성은 아래 tree와 같습니다.

[root@manvscloud-bastion-kr2 dos-blocker]# tree
.
└── manvscloud-alb
   ├──nacl_auto_block.py
   ├──nacl_auto_class.py
   ├──nacl_auto_remove.py
   └──__pycache__
       └── nacl_auto_class.cpython-37.pyc


2 directories, 4 files

완성되었다면 crontab을 이용하여 원하시는 시간대에 동작하게 하거나 등 다양한 방법으로 코드가 동작하게 하면 되겠습니다.

[참고] 뒤늦게 Application Load Balancer별로 구분해야겠다고 판단하여 Application Load Balancer별로 분리하여 사용할 경우 코드를 조금 수정해야할 수도 있습니다. 위 코드들은 현재 전체 Application Load Balancer를 대상으로 분석하기때문에 instance_no로 구분하여 분석하도록 해주면 원하는 결과를 얻으실 수 있을 것입니다.


Personal Comments

지금까지 Cloud Log Analytics와 NACL의 API를 활용하여 웹 접근 로그 분석을 기반으로 DOS 공격이 의심되는 IP를 실시간으로 자동 차단하고 일정 시간 후에 해당 차단 규칙을 자동으로 제거하는 시스템의 구현에 대해 상세하게 알아보았습니다.

이 시스템은 개인 테스트 환경에서 적은 비용으로 사이버 보안 위협으로부터 최소한의 효과는 얻을 수 있을 것으로 보입니다.

긴 글 읽어주셔서 감사합니다.