Browsing Tag

보안

NCLOUD

[NCLOUD] ACG(Access Control Group) 전체 허용 정책 탐지를 위한 Serverless 시스템 구축

안녕하세요 MANVSCLOUD 김수현입니다.

네이버 클라우드 플랫폼(NCLOUD)은 다양한 기능과 서비스를 제공하는 강력한 국내 클라우드 서비스입니다. 그 중에서도 ACG(Access Control Group)는 서버 접근을 제어하는 중요한 서비스로 사용자는 이를 통해 서버에 접근하는 네트워크를 제어 및 관리할 수 있습니다. 하지만 이 서비스를 잘 활용하고 있다는 기준은 무엇일까요?

개인적인 기준이나 기업에서 정해져있는 거버넌스 그리고 적용된 보안 컴플라이언스 등에 따라서 어느정도 차이가 있을 수는 있겠지만 결국 ACG 설정이 잘 되어있다는 기준은 Source Server와 Destination Server가 통신되어야할 특정 포트로만 통신할 수 있도록 허용되어있는 것입니다.

가끔 우리는 편리함을 목적으로 ‘0.0.0.0/0’ 대역을 허용하는 큰 위협에 노출되기도 합니다. 이는 모든 IP 주소에서 접근이 가능함을 의미하므로 클라우드 자산에 대한 공격 범위가 광범위해질 수 있어 ACG 설정이 적절히 이루어져 있는지 확인하고 필요한 보안 조치를 취해야 합니다.

본 포스트에서는 ACG 전체 허용 룰을 탐지하고 이를 Slack으로 알림이 발생하도록 하는 시스템을 Cloud Functions 서비스를 이용해 구축하는 방법에 대해 알아볼 것입니다.

이 방법을 통해 클라우드 환경에서의 보안 관리를 더욱 강화하고 실시간으로 보안 상태를 파악하는 데 도움이 될 것입니다.


Slack 알림

네이버 클라우드 플랫폼의 Cloud Functions를 이용하여 ACG 정책의 0.0.0.0/0 대역 허용을 탐지하는 코드를 실행할 수 있습니다. 이를 이용해 탐지한 위험한 ACG 정책에 대한 알림을 Slack으로 받을 수 있도록 해보겠습니다.

먼저 Slack 설치 및 가입이 되어있지 않다면 위 링크를 통해 진행해주시고 워크스페이스 생성 후 모니터링 알림을 받을 전용 채널을 하나 생성해줍니다.

이후 [앱 추가]에서 incoming webhooks을 검색하면 사용 가능한 앱이 아래 그림과 같이 보이는 것을 확인할 수 있습니다.

슬랙에서 설정 해주어야할 부분은 크게 없습니다. (설치+가입+앱 추가)

이제 추가한 Incoming WebHooks 앱을 설정해주어야하는데 이 부분 역시 크게 어렵지 않습니다.

먼저 [Slack에 추가] 를 눌러 앱을 추가해줍시다.

이후 새로운 화면으로 넘어가게되면 그냥 쭈욱 내려와서 아래 그림과 같은 부분만 설정해주고 복사해주면 됩니다.

  • 채널에 포스트 : 이 부분은 위 Slack에서 생성해준 채널을 선택해줍니다.
  • 웹후크 URL : 해당 URL은 [URL 복사]를 이용해 따로 복사해주세요.
    Cloud Functions 코드에 해당 URL을 추가할 것입니다.
  • 이름 사용자 지정 : 예를 들어 ncloud-cloud-insight 라고 지정하면 ncloud-cloud-insight이라는
    이름의 봇이 알림을 계속 해준다고 보면 됩니다.
  • [설정 저장]

이미지 업로드 등 나머지 설정은 자유입니다.


Cloud Functions

# VPC, Server 생성에 대한 가이드는 생략합니다.
# Python은 3.7 버전이 사용되었습니다.

최근 포스트가 대부분 Cloud Functions을 이용한 내용이 대부분이다보니 Trigger나 Package에 대한 내용은 생략하고 바로 Action에 대한 내용을 작성해보겠습니다.

  • 타입 : 기본
  • Action 이름, 설명 : 자유롭게 입력
  • 언어(소스코드) : Python 3.7
  • 타입(소스코드) : 코드
import requests
import time
import base64
import hashlib
import hmac
import json
import urllib.request

method = "GET"
timestamp = int(time.time() * 1000)
timestamp = str(timestamp)
api_server = "https://ncloud.apigw.ntruss.com"

def ACG_LIST(REGION, access_key, secret_key):
    method = "GET"
    timestamp = str(int(time.time() * 1000))
    api_server = "https://ncloud.apigw.ntruss.com"

    result = []

    uri = "/vserver/v2/getAccessControlGroupList"
    uri = uri + "?responseFormatType=json&regionCode=" + REGION

    message = method + " " + uri + "\n" + timestamp + "\n" + access_key
    message = bytes(message, 'UTF-8')
    signingKey = base64.b64encode(hmac.new(secret_key, message, digestmod=hashlib.sha256).digest())

    http_header = {
        'x-ncp-apigw-signature-v2': signingKey,
        'x-ncp-apigw-timestamp': timestamp,
        'x-ncp-iam-access-key': access_key
    }

    response = requests.get(api_server + uri, headers=http_header)

    data = json.loads(response.text)

    for acg in data['getAccessControlGroupListResponse']['accessControlGroupList']:
        result.append((acg['accessControlGroupNo'], acg['accessControlGroupName']))

    return result


def ACG_DETECTION(REGION, access_key, secret_key, exceptions=None):
    ANY_DATA = []
    secret_key = bytes(secret_key, 'UTF-8')

    ACG_NO = ACG_LIST(REGION, access_key, secret_key)

    for acg in ACG_NO:
        if acg[0] in exceptions:  
            continue

        uri = "/vserver/v2/getAccessControlGroupRuleList"
        uri = uri + "?responseFormatType=json&regionCode=" + REGION + "&accessControlGroupNo=" + acg[0]

        message = method + " " + uri + "\n" + timestamp + "\n" + access_key
        message = bytes(message, 'UTF-8')
        signingKey = base64.b64encode(hmac.new(secret_key, message, digestmod=hashlib.sha256).digest())

        http_header = {
            'x-ncp-apigw-signature-v2': signingKey,
            'x-ncp-apigw-timestamp': timestamp,
            'x-ncp-iam-access-key': access_key
        }

        response = requests.get(api_server + uri, headers=http_header)

        data = json.loads(response.text)

        for rule in data['getAccessControlGroupRuleListResponse']['accessControlGroupRuleList']:
            if rule['ipBlock'] == '0.0.0.0/0' and rule['portRange'] not in ['80', '443']:   # 예외 처리할 포트

                ANY_DATA.append({
                    "accessControlGroupNo": rule['accessControlGroupNo'],
                    "accessControlGroupName": acg[1],  # add the ACG name
                    "protocol": rule['protocolType']['code'],
                    "ipBlock": rule['ipBlock'],
                    "portRange": rule['portRange'],
                    "type": rule['accessControlGroupRuleType']['codeName']
                })

    return ANY_DATA

def post_slack(toSlack):
    WEBHOOK_URL = "복사해둔 WebHook URL"
    message = toSlack
    send_data = {
        "text": message,
    }

    send_text = json.dumps(send_data)

    request = urllib.request.Request(
        WEBHOOK_URL,
        data=send_text.encode('utf-8'),
    )

    with urllib.request.urlopen(request) as response:
        slack_message = response.read()
        
def main(args):
    REGION = args["REGION"]
    access_key = args["NCLOUD_ACCESS_KEY"]
    secret_key = args["NCLOUD_SECRET_KEY"]
    exceptions = ['11111', '22222']  # 예외 처리할 ACG의 ID

    ACG_DATA = ACG_DETECTION(REGION, access_key, secret_key, exceptions)

    for acg in ACG_DATA:
        ACG_NO = f"{acg['accessControlGroupNo']} ({acg['accessControlGroupName']})"  # include the ACG name
        PROTOCOL = f"{acg['protocol']}"
        IP = f"{acg['ipBlock']}"
        PORT = f"{acg['portRange']}"
        TYPE = f"{acg['type']}"

        post_slack("ACG_ID :" + ACG_NO + "\n PROTOCOL:" + PROTOCOL + "\n IP : " + IP + "\n 포트:" + PORT + "\n TYPE: " + TYPE)

    return {'status': 'OK'}

위 코드에서 필요에 맞게 수정 및 추가가 진행되어야합니다.

해당 코드에서는 80과 443포트는 예외 처리 포트로 지정되어 있습니다.
또한 위에서 Incoming WebHooks 앱 추가 후 복사해둔 WebHook URL을 코드에 적절히 추가해줍니다. 추가로 예외 처리가 필요한 ACG가 있다면 exceptions 부분에 예외 처리할 ACG ID를 추가하여 추가한 ACG에 대해서는 정책 점검에 대한 결과를 예외 처리할 수 있습니다.

  • 디폴트 파라미터(JSON)
{"VPC_ID":"6585","NCLOUD_ACCESS_KEY":"AAAAAAAAAAA","NCLOUD_SECRET_KEY":"BBBBBBBBBBBBBBBBBB","REGION":"KR"}

VPC_ID : VPC의 ID
NCLOUD_ACCESS_KEY : 해당 코드가 동작하기 위한 최소한의 권한을 가진 ACCESS KEY
NCLOUD_SECRET_KEY : 위 ACCESS KEY의 SECRET KEY
REGION : 리전 코드


Result

위 Cloud Functions 생성 과정을 마친 후 Action 실행 시 0.0.0.0/0 정책이 있을 경우 아래와 같이 Slack을 통해 알림을 받을 수 있습니다.

ACG의 이름과 함께 어떤 정책이 전체 허용되었는지를 바로 확인할 수 있으므로 관리자가 알림을 통해 즉각적인 대응이 가능합니다.


Personal Comments

지금까지 네이버 클라우드 플랫폼의 ACG(Access Control Group) 정책을 Cloud Functions으로 점검하는 방법에 대해 알아보았습니다. 이를 통해 위험한 ‘0.0.0.0/0’ 대역을 허용하는 ACG 정책을 탐지하고 이에 대한 알림을 받을 수 있는 시스템을 구축해보았습니다. 이러한 방법은 클라우드 환경에서의 보안 관리를 강화하는데 큰 도움이 될 것입니다.

Cloud Functions와 Slack의 Incoming WebHooks 앱을 이용하면 복잡한 인프라 없이도 실시간 보안 알림 시스템을 구축할 수 있습니다. 이는 시스템을 더욱 유연하게 만들고 보안 이슈에 대한 신속한 대응을 가능하게 합니다.

이 포스트가 여러분의 클라우드 보안 강화에 도움이 되길 바랍니다.

긴 글 읽어주셔서 감사합니다.