codaichi’s note

Corporate Engineerの学習記録

NotionのデータベースをBigQueryに自動連携してみた - Google Cloud Functionsと Cloud Schedulerを学ぶ

はじめに

今回はHappiness Chainの課題ではありませんが、BigQueryやCloud Functions, Cloud SchedulerといったGoogle Cloudサービスについて学んだので、やってみたことをまとめてみます。

実現したいこと

実現したいことは以下のようになります。

まずNotionのDatabaseをデータの取得元し、Cloud Functionsを使ってデータの取得と転送処理を行います。 その後、転送先のBig Queryにデータをアップロード。 最後に一連の流れをCloud Schedulerで定期実行できるように設定します。

イメージとしては下記になります。

実際のコードと実装の全体像

Google Functionsにアップロードするコードはこちらです。

import os
import logging
import requests
import functions_framework
from datetime import datetime
from google.cloud import bigquery
from dataclasses import dataclass
from google.api_core import retry
from google.cloud import secretmanager
from typing import List, Dict, Any

# Retrieve secrets from Google Secret Manager
def get_secret(secret_name):
    client = secretmanager.SecretManagerServiceClient()
    project_id = os.getenv("GCP_PROJECT")
    name = f"projects/{project_id}/secrets/{secret_name}/versions/latest"
    response = client.access_secret_version(name=name)
    return response.payload.data.decode("UTF-8")

# Configuration
@dataclass
class NotionConfig:
    token: str = get_secret("NOTION_TOKEN")
    database_id: str = get_secret("NOTION_DATABASE_ID")

@dataclass
class BigQueryConfig:
    project_id: str = get_secret("PROJECT_ID")
    dataset_id: str = get_secret("DATASET_ID")
    table_id: str = get_secret("TABLE_ID")

# Notion API Client
class NotionClient:
    BASE_URL = "https://api.notion.com/v1/databases/"
    
    def __init__(self):
        self.headers = {
            "Authorization": f"Bearer {NotionConfig().token}",
            "Notion-Version": "2022-06-28",
            "Content-Type": "application/json"
        }

    def fetch_pages(self) -> List[Dict[str, Any]]:
        url = f"{self.BASE_URL}{NotionConfig().database_id}/query"
        pages, cursor = [], None
        while True:
            response = requests.post(url, json={"start_cursor": cursor} if cursor else {}, headers=self.headers)
            response.raise_for_status()
            data = response.json()
            pages.extend(data.get("results", []))
            if not data.get("has_more"):
                break
            cursor = data.get("next_cursor")
        return pages

# Data Transformer
class NotionDataTransformer:
    @staticmethod
    def transform(pages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        return [{
            "page_id": p["id"],
            "created_time": p["created_time"],
            "last_edited_time": p["last_edited_time"]
        } for p in pages ]

# BigQuery client
class BigQueryClient:
    def __init__(self):
        self.client = bigquery.Client()

    def merge_data(self, table_id: str, data: List[Dict[str, Any]]):
        table_ref = f"{BigQueryConfig().project_id}.{BigQueryConfig().dataset_id}.{table_id}"
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
        )
        job = self.client.load_table_from_json(data, table_ref, job_config=job_config)
        job.result()
        print(f"Upload {len(data)} record to {table_ref}")

# Cloud Function Entry Point
@functions_framework.http
def notion_to_bigquery(request=None):
    start_time = datetime.now()
    
    try:
        notion_client, transformer, bq_client = NotionClient(), NotionDataTransformer(), BigQueryClient()
        notion_data = notion_client.fetch_pages()
        transformed_data = transformer.transform(notion_data)
        bq_client.merge_data(BigQueryConfig().table_id, transformed_data)
        elapsed_time = (datetime.now() - start_time).total_seconds()
        return f"Sync completed in {elapsed_time:.2f} seconds", 200
    except Exception as e:
        logging.basicConfig(level=logging.INFO)
        return str(e), 500

またrequirements.txtには下記を記載しました。

functions-framework
google-cloud-bigquery
google-api-core
google-cloud-secret-manager
requests

準備

ゴールがクリアになったところで、準備を進めていきたいと思います。準備するものとしては3つあります。

Notion APIの設定とGoogle Cloudの環境設定、Big Queryのテーブル作成です。 (Pythonコードは上記で示しているので、準備には入れていません)

Notion API設定

まずはIntegrationの作成とDatabaseとの設定から進めていきます。Integrationsページにアクセスして、New Integrationから作成することができます。Integrationが作成できるとIntegration Secretが発行されるので、メモしておきます。

Notion API

Integrationが作成できたら、DatabaseとのConnectionを設定します。

右上の3点マークをクリックし、表示されたメニューからConnectionsを選択、先ほどのIntegrationsを検索してコネクトします。

Integrationの設定

Google Cloud環境設定

続けて、Google Cloudでのプロジェクト作成を行います。一旦下記を実行しておけば問題ないと思います。 あとの工程でCloud Functionsにデプロイする際にはサービスアカウントにさらに権限を付与することになると思います。

権限付与のCommandはほぼ同じですので、必要な権限にroleの記載を変更して実行可能です。

# Projectの作成
gcloud projects create [PROJECT_ID]

# Datasetの設定 
bq mk --location asia-northeast1 --description [description] --dataset [DATASET]

# サービスアカウントの作成
gcloud iam service-accounts create notion-sync \
  --display-name [Service Account(好きな名前に変更)]
  
# BigQuery データ編集者ロールの付与
gcloud projects add-iam-policy-binding [PROJECT_ID] \
  --member="serviceAccount:[Service Account]@[PROJECT_ID].iam.gserviceaccount.com" \
  --role="roles/bigquery.dataEditor"

# Cloud Functions 実行者ロールの付与
gcloud projects add-iam-policy-binding [PROJECT_ID] \
  --member="serviceAccount:[Service Account]@[PROJECT_ID].iam.gserviceaccount.com" \
  --role="roles/cloudfunctions.invoker" 

# JSONキーファイルの作成
gcloud iam service-accounts keys create service-account-key.json \
  --iam-account="[Service Account]@[PROJECT_ID].iam.gserviceaccount.com"

JSONキーファイルまで作成できたら、下記の設定値をメモしておきます。

NOTION_TOKEN="token"
NOTION_DATABASE_ID="database_id"
GC_PROJECT_ID="project_id"
GC_DATASET_ID="dataset_id"
GC_TABLE_ID="table_id"
GC_CREDENTIALS_PATH="/path/to/your/service-account.json"

BigQueryのテーブル作成

テーブル作成もCommand Lineで作成することもできますが、コンソール画面からも対応可能です。

今回はスキーマの設定などを自動で行えるようにするため、main.py内でautodetect=True と設定しました。

デプロイと設定編

続いてmain.pyをCloud Functionsにデプロイしていきます。

Secret Managerの設定

先ほどメモしておいた設定値をSecret Managerに設定していきます。 Secret Manager は、Google Cloudのサービスの1つで、API キー、ユーザー名、パスワード、証明書などの機密データを保存して管理できるシークレットおよび認証情報の管理サービスです。

まずSecretsを作成します。この時権限が不足している場合には先ほどの権限付与のCommandを修正して権限を付与してください。

gcloud secrets create NOTION_TOKEN --replication-policy="automatic"
gcloud secrets create NOTION_DATABASE_ID --replication-policy="automatic"
gcloud secrets create PROJECT_ID --replication-policy="automatic"
gcloud secrets create DATASET_ID --replication-policy="automatic"
gcloud secrets create TABLE_ID --replication-policy="automatic"

次に先ほどの設定値をSecretにアップロードします。

echo -n "notion-token" | gcloud secrets versions add NOTION_TOKEN --data-file=-
echo -n "database-id" | gcloud secrets versions add NOTION_DATABASE_ID --data-file=-
echo -n "project-id" | gcloud secrets versions add PROJECT_ID --data-file=-
echo -n "dataset-id" | gcloud secrets versions add DATASET_ID --data-file=-
echo -n "table-id" | gcloud secrets versions add TABLE_ID --data-file=-

Cloud Functionsのデプロイ手順

Cloud Functionsへのデプロイは下記で行うことができます。

gcloud functions deploy notion_to_bigquery \
  --runtime python310 \
  --trigger-http \
  --region=us-central1 \
  --set-env-vars GCP_PROJECT=project-id

notion_to_bigqueryはmain.py で定義している名前と一致させます。

@functions_framework.http
def notion_to_bigquery(request=None):

Cloud Schedulerの設定

スケジューラーのジョブの設定は下記で行います。

job nameは任意のものを設定できます。 uriはCloud FunctionsのURLを入れてください。 今回は毎日1回、日本時間の15時に実行される設定です。

gcloud scheduler jobs create http [job name] \
    --location=asia-northeast1 \
    --schedule="0 15 * * *" \
    --time-zone="Asia/Tokyo" \
    --uri=[Cloud FunctionsのURL] \
    --http-method=POST \
    --attempt-deadline=300s

設定ができたら、下記でジョブが設定できているか確認できます。

gcloud scheduler jobs describe [job name] \
    --location=asia-northeast1

まとめ

以上が今回設定したNotionのDatabaseをNotion APIGoogle Functionsを使ってBigQueryにアップロードし、Could Schedulerを使って定期実行する方法です。

長くなってしまったので詳細については触れられていない部分も多いですが、設定は大変でした。

特にGoogle Cloudの権限周りがとても細かく、何度もPERMISSION_DENIEDが表示されて、その度にエラーメッセージから必要な権限を読み個人アカウントに切り替えてサービスアカウントに権限を割り当てるということを繰り返してしまいました。

最初からどのような権限が必要になるのか調べて割り当てていればよかったと反省しています。

ただ、勉強にはなったので、とてもいい経験でした。

Cloud Functionsで使うPythonコードの作成ができてしまえば、Google CloudのGUIで設定できると思います。

今回はNotion Databaseで行いましたが、他のサービスの監査ログの収集などでも活用できそうです。。

最後までお読みいただきありがとうございます。間違いがあればぜひ教えてください。