Browsing Tag

API

NCLOUD

[NCLOUD] Object Storage 활용 꿀팁 #1 : 파일 다운로드 없이 직접 읽기

안녕하세요 MANVSCLOUD 김수현입니다.

Object Storage는 데이터 저장과 관리의 효율성을 높여주는 네이버 클라우드의 중요한 서비스 중 하나입니다. 많은 사용자들은 기존 파일 시스템에 익숙하여 Object Storage를 다양하게 활용하지 못하고 있습니다. Windows에서는 .txt 파일을 읽고 싶을 경우 .txt 파일을 더블 클릭하여 실행하면 됐고 Linux에서는 cat이나 vi 등 명령어로 확인할 수 있습니다. 하지만 Object Storage에 저장된 .txt 파일의 내용을 확인하기 위해서 네이버 클라우드 웹 사이트에 접속하여 로그인하고 다운로드 한 뒤에 다운로드된 파일을 확인하거나 해당 Object의 URL을 이용하여 다운로드 한 뒤에 내용을 확인하고 있습니다.

Object Storage내에 있는 .txt, .json, .xml 등 다양한 텍스트 파일들을 다운로드없이 바로 확인할 수 있는 방법은 없을까요?

당연히 가능합니다.
오늘은 Object Storage에 저장된 텍스트 파일을 다운로드 하지 않고 내용을 확인할 수 있는 방법에 대해 알아보도록 하겠습니다.


Object Storage : 파일 다운로드 없이 텍스트 파일 읽기

먼저 이 과정에서는 xml.etree.ElementTree as ET 모듈을 사용할 것입니다.

xml.etree.ElementTree는 Python 표준 라이브러리의 일부로 XML 문서를 파싱하고 생성하기 위한 간단하고 효율적인 API를 제공합니다. 이 모듈은 XML 데이터와 상호 작용하는 데 필요한 기능을 포함하며 XML 파일을 읽고 수정하고 생성하는 데 사용할 수 있습니다.

xml.etree.ElementTree 모듈을 이용하여 네이버 클라우드의 Object Storage로부터 받은 XML 형식의 응답을 파싱하는 데 사용할 것입니다.

이 모듈을 이용하면 파일을 파일 시스템에 따로 저장하지 않고 파일의 내용을 메모리 상에서 직접 읽어들이고 처리할 수 있습니다.

import hashlib
import hmac
import datetime
import requests
import urllib.parse
import xml.etree.ElementTree as ET


def get_hash(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def create_signed_headers(headers):
    signed_headers = []

    for k in sorted(headers):
        signed_headers.append('%s;' % k)

    return ''.join(signed_headers)[:-1]

def create_standardized_headers(headers):
    signed_headers = []

    for k in sorted(headers):
        signed_headers.append('%s:%s\n' % (k, headers[k]))

    return ''.join(signed_headers)

def create_standardized_query_parameters(request_parameters):
    standardized_query_parameters = []

    if request_parameters:
        for k in sorted(request_parameters):
            standardized_query_parameters.append('%s=%s' % (k, urllib.parse.quote(request_parameters[k], safe='')))

        return '&'.join(standardized_query_parameters)
    else:
        return ''

class ObjectStorageSample:
    def __init__(self):
        self.region = 'kr-standard'
        self.endpoint = 'https://kr.object.ncloudstorage.com'
        self.host = 'kr.object.ncloudstorage.com'
        self.access_key = '[ACCESS_KEY_ID]'
        self.secret_key = '[SECRET_KEY]'

        self.payload_hash = 'UNSIGNED-PAYLOAD'
        self.hashing_algorithm = 'AWS4-HMAC-SHA256'
        self.service_name = 's3'
        self.request_type = 'aws4_request'

        self.time_format = '%Y%m%dT%H%M%SZ'
        self.date_format = '%Y%m%d'

    def _create_credential_scope(self, date_stamp):
        return date_stamp + '/' + self.region + '/' + self.service_name + '/' + self.request_type

    def _create_canonical_request(self, http_method, request_path, request_parameters, headers):
        standardized_query_parameters = create_standardized_query_parameters(request_parameters)
        standardized_headers = create_standardized_headers(headers)
        signed_headers = create_signed_headers(headers)

        canonical_request = (http_method + '\n' +
                             request_path + '\n' +
                             standardized_query_parameters + '\n' +
                             standardized_headers + '\n' +
                             signed_headers + '\n' +
                             self.payload_hash)

        return canonical_request

    def _create_string_to_sign(self, time_stamp, credential_scope, canonical_request):
        string_to_sign = (self.hashing_algorithm + '\n' +
                          time_stamp + '\n' +
                          credential_scope + '\n' +
                          hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())

        return string_to_sign

    def _create_signature_key(self, date_stamp):
        key_date = get_hash(('AWS4' + self.secret_key).encode('utf-8'), date_stamp)
        key_string = get_hash(key_date, self.region)
        key_service = get_hash(key_string, self.service_name)
        key_signing = get_hash(key_service, self.request_type)
        return key_signing

    def _create_authorization_header(self, headers, signature_key, string_to_sign, credential_scope):
        signed_headers = create_signed_headers(headers)
        signature = hmac.new(signature_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

        return (self.hashing_algorithm + ' ' +
                'Credential=' + self.access_key + '/' + credential_scope + ', ' +
                'SignedHeaders=' + signed_headers + ', ' +
                'Signature=' + signature)

    def _sign(self, http_method, request_path, headers, time, request_parameters=None):
        time_stamp = time.strftime(self.time_format)
        date_stamp = time.strftime(self.date_format)

        credential_scope = self._create_credential_scope(date_stamp)
        canonical_request = self._create_canonical_request(http_method, request_path, request_parameters, headers)
        string_to_sign = self._create_string_to_sign(time_stamp, credential_scope, canonical_request)
        signature_key = self._create_signature_key(date_stamp)

        headers['authorization'] = self._create_authorization_header(headers, signature_key, string_to_sign, credential_scope)


    def list_objects(self, bucket_name, request_parameters=None):
        http_method = 'GET'

        time = datetime.datetime.utcnow()
        time_stamp = time.strftime(self.time_format)

        headers = {'x-amz-date': time_stamp,
                   'x-amz-content-sha256': self.payload_hash,
                   'host': self.host}

        request_path = '/%s' % bucket_name

        self._sign(http_method, request_path, headers, time, request_parameters)
        request_url = self.endpoint + request_path
        r = requests.get(request_url, headers=headers, params=request_parameters)

        if r.status_code == 200:
            root = ET.fromstring(r.content)
            objects = []

            for content in root.findall('{http://s3.amazonaws.com/doc/2006-03-01/}Contents'):
                key = content.find('{http://s3.amazonaws.com/doc/2006-03-01/}Key').text
                if key.endswith('.txt'): 
                    last_modified = content.find('{http://s3.amazonaws.com/doc/2006-03-01/}LastModified').text
                    objects.append({'Key': key, 'LastModified': last_modified})

            objects.sort(key=lambda x: x['LastModified'], reverse=True)
            return objects
        else:
            return []

    def read_txt_object(self, bucket_name, object_name): 
        http_method = 'GET'

        time = datetime.datetime.utcnow()
        time_stamp = time.strftime(self.time_format)

        headers = {'x-amz-date': time_stamp,
                   'x-amz-content-sha256': self.payload_hash,
                   'host': self.host}

        request_path = '/%s/%s' % (bucket_name, object_name)

        self._sign(http_method, request_path, headers, time)
        request_url = self.endpoint + request_path
        r = requests.get(request_url, headers=headers)

        if r.status_code == 200:
            return r.content.decode('utf-8')


if __name__ == '__main__':
    sample = ObjectStorageSample()
    objects = sample.list_objects('[BUCKET_NAME]', {'max-keys': '10'})

    if objects:
        most_recent_object = objects[0]  # 가장 최신의 .txt 파일
        recent_text = sample.read_txt_object('[BUCKET_NAME]', most_recent_object['Key'])

        print(recent_text)

위 코드는 특정 Object Storage 버킷에서 가장 최신의 .txt 파일을 다운로드 하지 않고 내용을 출력할 수 있는 코드입니다.

코드에 포함된 [ACCESS_KEY_ID], [SECRET_KEY], [BUCKET_NAME]는 각 항목에 맞게 수정이 필요합니다.

  • [ACCESS_KEY_ID] : Object Storage에 대한 권한을 보유한 Sub Account의 Access Key
  • [SECRET_KEY] : Object Storage에 대한 권한을 보유한 Sub Account의 Secret Key
  • [BUCKET_NAME] : 텍스트 파일이 저장된 Object Storage의 Bucket Name

위와 같이 .txt 파일이 Object Storage 버킷에 업로드 되어있는 경우 Object Storage API를 이용하여 해당 파일을 찾아 읽을 것입니다.

실행 결과 아래와 같이 .txt 파일의 내용이 잘 출력되는 것까지 확인됩니다.

우리는 이러한 방법을 통해 Object Storage에 저장된 파일을 다운로드없이도 내용을 확인할 수 있습니다. .txt 파일 외 .json 파일 역시 동일하게 사용할 수 있으며 json 모듈을 이용하여 파싱 후 원하는 값만 가져올 수도 있습니다.


Personal Comments

이 글을 통해 우리는 Object Storage에서 파일을 다운로드하지 않고 직접 읽는 방법을 알아보았습니다. 이 과정은 작업 시간을 단축시키는 데 도움이 될 수 있습니다. 또한 이러한 방법을 통해 다양한 Object Storage 사용 방법을 생각해볼 수도 있을 것입니다.

긴 글 읽어주셔서 감사합니다.