ginokent Blog
About RSS JA

Building a Daily Data Pipeline from RDS Aurora to BigQuery with Terraform

I had the opportunity to build a pipeline that transfers data from AWS RDS Aurora to GCP BigQuery on a daily basis, so I’m documenting the architecture and implementation. Since many implementations online use CDC and other high-overhead approaches, I focused on a cost-effective solution based on RDS Snapshot S3 export that doesn’t require additional RDS configuration.

Overview

  • Export to S3 in Parquet format using RDS Export to S3 feature
  • Transfer from S3 → GCS using Storage Transfer Service
  • Reorganize path structure for BigQuery external tables using Cloud Functions
  • Reference Parquet files on GCS via BigQuery external tables
  • Entire infrastructure as code with Terraform

Architecture

+---------------------------------------------------------------------------------------------+
|                                          AWS                                                |
|                                                                                             |
|  +---------------------+     +-----------------------------------------------------------+  |
|  | EventBridge         |     | Step Functions (rds2bq-rds2s3)                            |  |
|  | Scheduler           |     |                                                           |  |
|  |                     |     |  +-----------+   +------------+   +-----------------+     |  |
|  | Trigger at      ------------>|BuildPrefix|-->|CreateSnap- |-->|DescribeSnapshot |     |  |
|  | JST 00:30           |     |  |(parse dt) |   |shot        |   |(poll until done)|     |  |
|  +---------------------+     |  +-----------+   +------------+   +--------+--------+     |  |
|                              |                                           |               |  |
|                              |                                           v               |  |
|                              |                                  +-----------------+      |  |
|                              |                                  | StartExportTask |      |  |
|                              |                                  | (start S3 export)|     |  |
|                              |                                  +--------+--------+      |  |
|                              +-------------------------------------------+---------------+  |
|                                                                          |                  |
|  +---------------------+                                                 |                  |
|  | RDS Aurora          |                                                 v                  |
|  |                     |     +------------------------------------------------------+       |
|  |  +---------------+  |     | RDS Export Task (async, 15-30 min)                   |       |
|  |  |   Database    |------->|  Snapshot -> Parquet conversion -> S3 output         |       |
|  |  +---------------+  |     |  (KMS encrypted)                                     |       |
|  +---------------------+     +----------------------------------+-------------------+       |
|                                                                 |                           |
|                                                                 v                           |
|                              +------------------------------------------------------+       |
|                              | S3 Bucket                                            |       |
|                              |  exports/year=YYYY/month=MM/date=DD/.../*.parquet    |       |
|                              +----------------------------------+-------------------+       |
|                                                                 |                           |
+---------------------------------------------------------------------------------------------+
                                                                  |
                                                                  | Storage Transfer Service
                                                                  | (hourly)
                                                                  v
+---------------------------------------------------------------------------------------------+
|                                          GCP                                                |
|                                                                                             |
|                              +------------------------------------------------------+       |
|                              | GCS Bucket                                           |       |
|                              |  exports/        (raw data copied from S3)           |       |
|                              |  reorganized/    (reorganized for BigQuery)          |       |
|                              +----------------------------------+-------------------+       |
|                                                                 |                           |
|                                       Eventarc (object.finalized)                           |
|                                                                 v                           |
|                              +------------------------------------------------------+       |
|                              | Cloud Functions (rds2bq-reorganizer)                 |       |
|                              |  Copy: exports/ -> reorganized/                      |       |
|                              +----------------------------------+-------------------+       |
|                                                                 |                           |
|                                                                 v                           |
|                              +------------------------------------------------------+       |
|                              | BigQuery (External Tables)                           |       |
|                              |  -> references reorganized/.../*.parquet             |       |
|                              +------------------------------------------------------+       |
|                                                                                             |
+---------------------------------------------------------------------------------------------+

Estimated Processing Time

Measurements from my test environment (1 GiB RDS Snapshot):

ProcessDuration
RDS Snapshot creation~3 min
S3 Export Task queue wait~15 min
S3 Export Task execution~15 min
S3 → GCS transfer~30 sec
Total~30-35 min

Key Components

ComponentRole
AWS KMSEncryption key for RDS Export
S3 BucketExport destination (TLS/KMS required, auto-delete after 30 days)
Step FunctionsOrchestration of snapshot creation → Export
EventBridge SchedulerDaily scheduler (JST 00:30)
Storage Transfer ServicePeriodic S3 → GCS transfer (hourly)
Cloud FunctionsParquet file path reorganization
BigQueryReference Parquet as external tables

Prerequisites

  • Permissions to create resources with Terraform in both AWS and GCP
  • An existing RDS Aurora cluster
  • Cloud Functions source code (described later) placed in ./functions/rds2bq-reorganizer/

Directory Structure

.
├── main.tf          # Terraform code from this article
└── functions/
    └── rds2bq-reorganizer/
        ├── main.go  # Cloud Functions source code
        └── go.mod

Variable Definitions

variable "gcp_project_id" {
  description = "GCP Project ID"
  type        = string
}

variable "aws_region" {
  description = "AWS Region"
  type        = string
  default     = "ap-northeast-1"
}

variable "gcp_region" {
  description = "GCP Region"
  type        = string
  default     = "asia-northeast1"
}

variable "rds_cluster_identifier" {
  description = "RDS Aurora cluster identifier"
  type        = string
}

variable "database_name" {
  description = "Database name to transfer"
  type        = string
}

variable "table_names" {
  description = "List of table names to transfer"
  type        = list(string)
}

Example terraform.tfvars:

gcp_project_id         = "my-gcp-project"
aws_region             = "ap-northeast-1"
gcp_region             = "asia-northeast1"
rds_cluster_identifier = "my-aurora-cluster"
database_name          = "mydb"
table_names = [
  "users",
  "orders",
  "products",
]

Data Sources

data "aws_caller_identity" "current" {}

data "google_project" "current" {
  project_id = var.gcp_project_id
}

data "google_storage_transfer_project_service_account" "current" {
  project = data.google_project.current.name
}

locals {
  rds2bq_export_path_prefix = "exports"
}

Before using Storage Transfer Service, enable the service account with the following command:

gcloud transfer authorize --add-missing --project=${GCP_PROJECT_ID}

AWS Implementation

Creating the KMS Key

The RDS Export to S3 feature requires KMS encryption.

resource "aws_kms_key" "rds2bq" {
  description         = "KMS for rds2bq"
  enable_key_rotation = true
}

resource "aws_kms_alias" "rds2bq" {
  name          = "alias/rds2bq"
  target_key_id = aws_kms_key.rds2bq.key_id
}

resource "aws_kms_key_policy" "rds2bq" {
  key_id = aws_kms_key.rds2bq.key_id
  policy = data.aws_iam_policy_document.rds2bq_kms_key_policy.json
}

data "aws_iam_policy_document" "rds2bq_kms_key_policy" {
  # Administrator permissions
  statement {
    sid    = "AllowRootAccountAdmin"
    effect = "Allow"
    principals {
      type        = "AWS"
      identifiers = ["arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"]
    }
    actions   = ["kms:*"]
    resources = ["*"]
  }

  # Allow CreateGrant for RDS Export service
  statement {
    sid    = "AllowRdsExportCreateGrant"
    effect = "Allow"
    principals {
      type        = "Service"
      identifiers = ["export.rds.amazonaws.com"]
    }
    actions   = ["kms:CreateGrant", "kms:DescribeKey"]
    resources = ["*"]
  }

  # Allow DescribeKey for Scheduler role
  statement {
    sid    = "AllowDescribeKeyToSchedulerRole"
    effect = "Allow"
    principals {
      type        = "AWS"
      identifiers = [aws_iam_role.rds2bq_rds2s3_scheduler.arn]
    }
    actions   = ["kms:DescribeKey"]
    resources = ["*"]
  }

  # Allow Decrypt for S3→GCS transfer role
  statement {
    sid    = "AllowDecryptToImportRole"
    effect = "Allow"
    principals {
      type        = "AWS"
      identifiers = [aws_iam_role.rds2bq_s32gcs.arn]
    }
    actions   = ["kms:Decrypt", "kms:DescribeKey"]
    resources = ["*"]
  }
}

Creating the S3 Bucket

The S3 bucket for exports. The following security settings are configured:

  • Completely block public access
  • Require TLS
  • Require encryption with specified KMS key
  • Allow writes only from the Export role
  • Auto-delete after 30 days
resource "aws_s3_bucket" "rds2bq" {
  bucket        = "rds2bq-${data.aws_caller_identity.current.account_id}-${var.aws_region}"
  force_destroy = true
}

resource "aws_s3_bucket_public_access_block" "rds2bq" {
  bucket                  = aws_s3_bucket.rds2bq.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_lifecycle_configuration" "rds2bq" {
  bucket = aws_s3_bucket.rds2bq.id
  rule {
    id     = "expire-30d"
    status = "Enabled"
    expiration { days = 30 }
  }
}

resource "aws_s3_bucket_policy" "rds2bq" {
  bucket = aws_s3_bucket.rds2bq.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "DenyNonTLS"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:*"
        Resource  = [aws_s3_bucket.rds2bq.arn, "${aws_s3_bucket.rds2bq.arn}/*"]
        Condition = { Bool = { "aws:SecureTransport" = "false" } }
      },
      {
        Sid       = "DenyPutWithoutSSEKMS"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:PutObject"
        Resource  = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
        Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } }
      },
      {
        Sid       = "DenyPutWithWrongKmsKey"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:PutObject"
        Resource  = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption-aws-kms-key-id" = [
              aws_kms_key.rds2bq.arn,
              aws_kms_key.rds2bq.key_id,
              aws_kms_alias.rds2bq.arn
            ]
          }
        }
      },
      {
        Sid       = "DenyPutIfNotExportRole"
        Effect    = "Deny"
        Principal = "*"
        Action    = ["s3:PutObject"]
        Resource  = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
        Condition = {
          StringNotEquals = {
            "aws:PrincipalArn" = aws_iam_role.rds2bq_rds2s3_rdsexport.arn
          }
        }
      }
    ]
  })
}

IAM Role for RDS Export

Create a role that export.rds.amazonaws.com can AssumeRole:

resource "aws_iam_role" "rds2bq_rds2s3_rdsexport" {
  name = "rds2bq-rds2s3-rdsexport"
  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Sid       = "OnlyRdsExportServiceInThisAccount",
      Effect    = "Allow",
      Principal = { Service = "export.rds.amazonaws.com" },
      Action    = "sts:AssumeRole",
      Condition = { StringEquals = { "aws:SourceAccount" = data.aws_caller_identity.current.account_id } }
    }]
  })
}

resource "aws_iam_policy" "rds2bq_rds2s3_rdsexport" {
  name = "rds2bq-rds2s3-rdsexport"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Sid      = "ObjectRWUnderExportsPrefix",
        Effect   = "Allow",
        Action   = ["s3:PutObject*", "s3:GetObject*", "s3:DeleteObject*", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"],
        Resource = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
      },
      {
        Sid      = "ListBucketWithPrefix",
        Effect   = "Allow",
        Action   = ["s3:ListBucket", "s3:GetBucketLocation", "s3:ListBucketMultipartUploads"],
        Resource = aws_s3_bucket.rds2bq.arn
      },
      {
        Sid      = "UseKmsKeyForSnapshotExport",
        Effect   = "Allow",
        Action   = ["kms:Encrypt", "kms:Decrypt", "kms:ReEncrypt*", "kms:GenerateDataKey*", "kms:DescribeKey"],
        Resource = aws_kms_key.rds2bq.arn
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3_rdsexport" {
  role       = aws_iam_role.rds2bq_rds2s3_rdsexport.name
  policy_arn = aws_iam_policy.rds2bq_rds2s3_rdsexport.arn
}

Step Functions

Since RDS Export to S3 is asynchronous, we use Step Functions for orchestration.

The flow is as follows:

  1. BuildPrefix: Generate date partition path from scheduled time
  2. CreateSnapshot: Create DB cluster snapshot
  3. DescribeSnapshot: Poll snapshot status
  4. StartExport: Start S3 Export Task
resource "aws_iam_role" "rds2bq_rds2s3" {
  name = "rds2bq-rds2s3"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{ Effect = "Allow", Principal = { Service = "states.amazonaws.com" }, Action = "sts:AssumeRole" }]
  })
}

resource "aws_iam_policy" "rds2bq_rds2s3" {
  name = "rds2bq-rds2s3"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      { Sid = "StartExport", Effect = "Allow", Action = ["rds:StartExportTask"], Resource = "*" },
      { Sid = "CreateSnapshot", Effect = "Allow", Action = ["rds:CreateDBClusterSnapshot"], Resource = "*" },
      { Sid = "DescribeSnapshot", Effect = "Allow", Action = ["rds:DescribeDBClusterSnapshots"], Resource = "*" },
      { Sid = "TagSnapshot", Effect = "Allow", Action = ["rds:AddTagsToResource", "rds:ListTagsForResource", "rds:RemoveTagsFromResource"], Resource = "*" },
      {
        Sid       = "PassExportRole",
        Effect    = "Allow",
        Action    = "iam:PassRole",
        Resource  = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
        Condition = { StringEquals = { "iam:PassedToService" = "rds.amazonaws.com" } }
      },
      { Sid = "AllowKmsCreateGrantForExport", Effect = "Allow", Action = ["kms:CreateGrant", "kms:DescribeKey"], Resource = aws_kms_key.rds2bq.arn },
      { Sid = "AllowCloudWatchLogsDelivery", Effect = "Allow", Action = ["logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups"], Resource = "*" }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3" {
  role       = aws_iam_role.rds2bq_rds2s3.name
  policy_arn = aws_iam_policy.rds2bq_rds2s3.arn
}

resource "aws_cloudwatch_log_group" "rds2bq_rds2s3" {
  name              = "/aws/states/rds2bq-rds2s3"
  retention_in_days = 30
}

resource "aws_sfn_state_machine" "rds2bq_rds2s3" {
  name     = "rds2bq-rds2s3"
  role_arn = aws_iam_role.rds2bq_rds2s3.arn
  definition = jsonencode({
    Comment = "Build date partition prefix, create/await DB cluster snapshot, then StartExportTask",
    StartAt = "BuildPrefix",
    States = {
      # $.scheduled_time is assumed to be ISO8601 (e.g., 2026-01-08T18:42:21Z)
      BuildPrefix = {
        Type = "Pass",
        Parameters = {
          "year.$"   = "States.ArrayGetItem(States.StringSplit($.scheduled_time, '-'), 0)",
          "month.$"  = "States.ArrayGetItem(States.StringSplit($.scheduled_time, '-'), 1)",
          "day.$"    = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 0), '-'), 2)",
          "hour.$"   = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 0)",
          "minute.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 1)",
          "second.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 2), '.Zz+-'), 0)"
        },
        ResultPath = "$.built",
        Next       = "CreateSnapshot"
      },

      CreateSnapshot = {
        Type     = "Task",
        Resource = "arn:aws:states:::aws-sdk:rds:createDBClusterSnapshot",
        Parameters = {
          "DbClusterIdentifier"           = var.rds_cluster_identifier,
          "DbClusterSnapshotIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)"
        },
        ResultPath = "$.create",
        Catch = [{
          ErrorEquals = ["States.ALL"],
          ResultPath  = "$.create_error",
          Next        = "WaitBeforeDescribe"
        }],
        Next = "WaitBeforeDescribe"
      },

      WaitBeforeDescribe = { Type = "Wait", Seconds = 20, Next = "DescribeSnapshot" },

      DescribeSnapshot = {
        Type     = "Task",
        Resource = "arn:aws:states:::aws-sdk:rds:describeDBClusterSnapshots",
        Parameters = {
          "DbClusterSnapshotIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)"
        },
        ResultPath = "$.desc",
        Catch = [{
          ErrorEquals = ["States.ALL"],
          ResultPath  = "$.describe_error",
          Next        = "WaitAndRetryDescribe"
        }],
        Next = "CheckSnapshotStatus"
      },

      CheckSnapshotStatus = {
        Type = "Choice",
        Choices = [
          { Variable = "$.desc.DbClusterSnapshots[0].Status", StringEquals = "available", Next = "StartExport" },
          { Variable = "$.desc.DbClusterSnapshots[0].Status", StringEquals = "failed", Next = "FailSnapshot" }
        ],
        Default = "WaitAndRetryDescribe"
      },

      WaitAndRetryDescribe = { Type = "Wait", Seconds = 15, Next = "DescribeSnapshot" },

      StartExport = {
        Type     = "Task",
        Resource = "arn:aws:states:::aws-sdk:rds:startExportTask",
        Parameters = {
          "ExportTaskIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)",
          "SourceArn.$"            = "$.desc.DbClusterSnapshots[0].DbClusterSnapshotArn",
          "S3BucketName"           = aws_s3_bucket.rds2bq.bucket,
          "S3Prefix.$"             = "States.Format('${local.rds2bq_export_path_prefix}/year={}/month={}/date={}', $.built.year, $.built.month, $.built.day)",
          "IamRoleArn"             = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
          "KmsKeyId"               = aws_kms_key.rds2bq.arn
        },
        End = true
      },

      FailSnapshot = { Type = "Fail", Error = "SnapshotFailed", Cause = "DB cluster snapshot status=failed" }
    }
  })
  logging_configuration {
    include_execution_data = true
    level                  = "ALL"
    log_destination        = "${aws_cloudwatch_log_group.rds2bq_rds2s3.arn}:*"
  }
}

The complexity of second.$ in BuildPrefix is due to timezone information (Z or +09:00) at the end of ISO8601. Simply splitting by : would leave extra characters at the end.

EventBridge Scheduler

Scheduler to trigger Step Functions at JST 00:30:

resource "aws_iam_role" "rds2bq_rds2s3_scheduler" {
  name = "rds2bq-rds2s3-scheduler"
  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect    = "Allow",
      Principal = { Service = "scheduler.amazonaws.com" },
      Action    = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_policy" "rds2bq_rds2s3_scheduler" {
  name = "rds2bq-rds2s3-scheduler"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      { Sid = "AllowStartExportTask", Effect = "Allow", Action = ["rds:StartExportTask"], Resource = "*" },
      { Sid = "AllowStartStepFn", Effect = "Allow", Action = ["states:StartExecution"], Resource = aws_sfn_state_machine.rds2bq_rds2s3.arn },
      {
        Sid      = "AllowPassOnlyExportRoleToRds",
        Effect   = "Allow",
        Action   = "iam:PassRole",
        Resource = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
        Condition = { StringEquals = { "iam:PassedToService" = "rds.amazonaws.com" } }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3_scheduler" {
  role       = aws_iam_role.rds2bq_rds2s3_scheduler.name
  policy_arn = aws_iam_policy.rds2bq_rds2s3_scheduler.arn
}

resource "aws_scheduler_schedule" "rds2bq_rds2s3" {
  name                = "rds2bq-rds2s3"
  group_name          = "default"
  description         = "Daily: Step Functions -> RDS StartExportTask"
  schedule_expression = "cron(30 15 * * ? *)" # UTC 15:30 = JST 00:30
  flexible_time_window { mode = "OFF" }

  target {
    arn      = "arn:aws:scheduler:::aws-sdk:sfn:startExecution"
    role_arn = aws_iam_role.rds2bq_rds2s3_scheduler.arn
    # NOTE: Using heredoc because jsonencode escapes <aws.scheduler.scheduled-time>
    input = <<-EOF
    {
      "StateMachineArn": "${aws_sfn_state_machine.rds2bq_rds2s3.arn}",
      "Input": "{\"scheduled_time\":\"<aws.scheduler.scheduled-time>\"}"
    }
    EOF
  }
}

Using jsonencode escapes <aws.scheduler.scheduled-time> to \u003c...\u003e, which prevents EventBridge Scheduler from recognizing it as a context variable. A heredoc must be used instead.

IAM Role for S3 → GCS Transfer

IAM role for Storage Transfer Service to access AWS S3. Uses Web Identity Federation to trust the GCP service account:

resource "aws_iam_role" "rds2bq_s32gcs" {
  name = "rds2bq-s32gcs"
  assume_role_policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [{
      "Effect" : "Allow",
      "Principal" : { "Federated" : "accounts.google.com" },
      "Action" : "sts:AssumeRoleWithWebIdentity",
      "Condition" : {
        "StringEquals" : {
          "accounts.google.com:sub" : data.google_storage_transfer_project_service_account.current.subject_id
        }
      }
    }]
  })
}

resource "aws_iam_policy" "rds2bq_s32gcs" {
  name = "rds2bq-s32gcs"
  policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [{
      "Effect" : "Allow",
      "Action" : ["s3:Get*", "s3:List*", "s3:Delete*"],
      "Resource" : "*"
    }]
  })
}

resource "aws_iam_role_policy_attachment" "rds2bq_s32gcs" {
  role       = aws_iam_role.rds2bq_s32gcs.name
  policy_arn = aws_iam_policy.rds2bq_s32gcs.arn
}

GCP Implementation

Storage Transfer Service Permissions

resource "google_project_iam_member" "storage_transfer_storage_admin" {
  project = data.google_project.current.name
  role    = "roles/storage.admin"
  member  = data.google_storage_transfer_project_service_account.current.member
}

GCS Bucket and Storage Transfer Job

resource "google_storage_bucket" "rds2bq" {
  name                        = "${data.google_project.current.name}-rds2bq"
  location                    = var.gcp_region
  uniform_bucket_level_access = true
  public_access_prevention    = "enforced"
  lifecycle_rule {
    action { type = "Delete" }
    condition { age = 30 }
  }
}

resource "google_storage_bucket_iam_member" "rds2bq_storage_transfer" {
  bucket = google_storage_bucket.rds2bq.name
  role   = "roles/storage.objectAdmin"
  member = data.google_storage_transfer_project_service_account.current.member
}

resource "google_storage_transfer_job" "rds2bq_s32gcs_daily" {
  description = "Daily copy Parquet from AWS S3 to GCS"
  project     = data.google_project.current.name
  transfer_spec {
    aws_s3_data_source {
      bucket_name             = aws_s3_bucket.rds2bq.bucket
      managed_private_network = true
      role_arn                = aws_iam_role.rds2bq_s32gcs.arn
    }
    gcs_data_sink {
      bucket_name = google_storage_bucket.rds2bq.name
    }
    object_conditions {
      include_prefixes = ["${local.rds2bq_export_path_prefix}/"]
    }
    transfer_options {
      overwrite_objects_already_existing_in_sink = false
    }
  }
  schedule {
    schedule_start_date {
      year  = 2025
      month = 1
      day   = 1
    }
    repeat_interval = "3600s" # hourly
  }
}

Cloud Functions (Path Reorganization)

The output path structure from RDS Export to S3 is as follows:

exports/year=2026/month=01/date=09/rds2bq-20260109t003000/{database}/{database}.{table}/{partition}/part-00000.parquet

This is transformed to the following structure for easier reference by BigQuery external tables:

reorganized/dbcluster/{cluster_id}/database/{database}/table/{table}/part-00000.parquet

Cloud Functions Source Code

./functions/rds2bq-reorganizer/main.go:

package rds2bqreorganizer

import (
	"context"
	"errors"
	"fmt"
	"os"
	"path/filepath"
	"regexp"
	"strings"

	"cloud.google.com/go/storage"
	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"github.com/cloudevents/sdk-go/v2/event"
)

func init() {
	functions.CloudEvent("ReorganizeParquet", reorganizeParquet)
}

type StorageObjectData struct {
	Bucket string `json:"bucket"`
	Name   string `json:"name"`
}

func reorganizeParquet(ctx context.Context, e event.Event) error {
	var data StorageObjectData
	if err := e.DataAs(&data); err != nil {
		return fmt.Errorf("event.DataAs: %w", err)
	}

	// Only process .parquet files under exports/
	if !strings.HasPrefix(data.Name, "exports/") || !strings.HasSuffix(data.Name, ".parquet") {
		return nil
	}

	clusterID := os.Getenv("DB_CLUSTER_IDENTIFIER")
	if clusterID == "" {
		return errors.New("DB_CLUSTER_IDENTIFIER environment variable is not set")
	}

	newPath, err := buildNewPath(data.Name, clusterID)
	if err != nil {
		return nil // Skip files that cannot be parsed
	}

	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	src := client.Bucket(data.Bucket).Object(data.Name)
	dst := client.Bucket(data.Bucket).Object(newPath)

	if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
		return fmt.Errorf("copy failed: %w", err)
	}

	return nil
}

var pathPattern = regexp.MustCompile(
	`^exports/year=(\d{4})/month=(\d{2})/date=(\d{2})/([^/]+)/([^/]+)/([^/]+)/([^/]+)/(.+\.parquet)$`,
)

func buildNewPath(srcPath, clusterID string) (string, error) {
	matches := pathPattern.FindStringSubmatch(srcPath)
	if matches == nil {
		return "", fmt.Errorf("path does not match expected pattern: %s", srcPath)
	}

	database := matches[5]
	dbTable := matches[6] // database.table format
	filename := matches[8]

	// Extract table name from database.table
	table := strings.TrimPrefix(dbTable, database+".")

	return filepath.Join(
		"reorganized",
		"dbcluster", clusterID,
		"database", database,
		"table", table,
		filename,
	), nil
}

./functions/rds2bq-reorganizer/go.mod:

module rds2bqreorganizer

go 1.24

require (
	cloud.google.com/go/storage v1.43.0
	github.com/GoogleCloudPlatform/functions-framework-go v1.8.1
	github.com/cloudevents/sdk-go/v2 v2.15.2
)

Deploying Cloud Functions with Terraform

# Bucket for Cloud Functions source code
resource "google_storage_bucket" "rds2bq_functions" {
  name                        = "${data.google_project.current.name}-rds2bq-functions"
  location                    = var.gcp_region
  uniform_bucket_level_access = true
  public_access_prevention    = "enforced"
}

# ZIP the source code
data "archive_file" "rds2bq_reorganizer" {
  type        = "zip"
  source_dir  = "${path.module}/functions/rds2bq-reorganizer"
  output_path = "${path.module}/functions/rds2bq-reorganizer.zip"
}

resource "google_storage_bucket_object" "rds2bq_reorganizer" {
  name   = "rds2bq-reorganizer-${data.archive_file.rds2bq_reorganizer.output_md5}.zip"
  bucket = google_storage_bucket.rds2bq_functions.name
  source = data.archive_file.rds2bq_reorganizer.output_path
}

# Service account for Cloud Functions execution
resource "google_service_account" "rds2bq_reorganizer" {
  account_id   = "rds2bq-reorganizer"
  display_name = "Cloud Function rds2bq-reorganizer"
}

resource "google_storage_bucket_iam_member" "rds2bq_reorganizer_object_admin" {
  bucket = google_storage_bucket.rds2bq.name
  role   = "roles/storage.objectAdmin"
  member = google_service_account.rds2bq_reorganizer.member
}

resource "google_project_iam_member" "rds2bq_reorganizer_event_receiver" {
  project = data.google_project.current.project_id
  role    = "roles/eventarc.eventReceiver"
  member  = google_service_account.rds2bq_reorganizer.member
}

# Cloud Functions (2nd gen)
resource "google_cloudfunctions2_function" "rds2bq_reorganizer" {
  name     = "rds2bq-reorganizer"
  location = var.gcp_region
  build_config {
    runtime     = "go124"
    entry_point = "ReorganizeParquet"
    source {
      storage_source {
        bucket = google_storage_bucket.rds2bq_functions.name
        object = google_storage_bucket_object.rds2bq_reorganizer.name
      }
    }
  }
  service_config {
    max_instance_count    = 10
    available_memory      = "256Mi"
    timeout_seconds       = 60
    service_account_email = google_service_account.rds2bq_reorganizer.email
    environment_variables = {
      DB_CLUSTER_IDENTIFIER = var.rds_cluster_identifier
    }
  }
  event_trigger {
    trigger_region        = var.gcp_region
    event_type            = "google.cloud.storage.object.v1.finalized"
    service_account_email = google_service_account.rds2bq_reorganizer.email
    event_filters {
      attribute = "bucket"
      value     = google_storage_bucket.rds2bq.name
    }
    retry_policy = "RETRY_POLICY_RETRY"
  }
}

resource "google_cloud_run_service_iam_member" "rds2bq_reorganizer_invoker" {
  location = google_cloudfunctions2_function.rds2bq_reorganizer.location
  service  = google_cloudfunctions2_function.rds2bq_reorganizer.name
  role     = "roles/run.invoker"
  member   = google_service_account.rds2bq_reorganizer.member
}

BigQuery External Tables

Create external tables that reference the reorganized paths:

resource "google_bigquery_dataset" "rds2bq" {
  dataset_id                 = "rds2bq_${var.database_name}"
  location                   = var.gcp_region
  delete_contents_on_destroy = false
}

resource "google_bigquery_table" "rds2bq" {
  for_each            = toset(var.table_names)
  dataset_id          = google_bigquery_dataset.rds2bq.dataset_id
  table_id            = each.key
  deletion_protection = false
  external_data_configuration {
    source_format = "PARQUET"
    autodetect    = true
    source_uris = [
      "gs://${google_storage_bucket.rds2bq.name}/reorganized/dbcluster/${var.rds_cluster_identifier}/database/${var.database_name}/table/${each.key}/*.parquet"
    ]
  }
}

Using for_each, external tables can be created in bulk from the list of table names.


Additional Notes

  • RDS Export to S3 is asynchronous with no completion notification, so Step Functions polls for snapshot completion before starting the Export
  • Storage Transfer Service runs hourly. Export completion timing varies slightly day by day, so we allow some buffer
  • Cloud Functions is triggered by Eventarc’s object.finalized event whenever a Parquet file is created
  • BigQuery external tables read from GCS on every query, so loading to native tables may be preferable for frequent queries
  • S3 has a 30-day lifecycle policy, GCS has a 30-day lifecycle policy for automatic deletion