I had the opportunity to build a pipeline that transfers data from AWS RDS Aurora to GCP BigQuery on a daily basis, so I’m documenting the architecture and implementation. Since many implementations online use CDC and other high-overhead approaches, I focused on a cost-effective solution based on RDS Snapshot S3 export that doesn’t require additional RDS configuration.
Overview
- Export to S3 in Parquet format using RDS Export to S3 feature
- Transfer from S3 → GCS using Storage Transfer Service
- Reorganize path structure for BigQuery external tables using Cloud Functions
- Reference Parquet files on GCS via BigQuery external tables
- Entire infrastructure as code with Terraform
Architecture
+---------------------------------------------------------------------------------------------+
| AWS |
| |
| +---------------------+ +-----------------------------------------------------------+ |
| | EventBridge | | Step Functions (rds2bq-rds2s3) | |
| | Scheduler | | | |
| | | | +-----------+ +------------+ +-----------------+ | |
| | Trigger at ------------>|BuildPrefix|-->|CreateSnap- |-->|DescribeSnapshot | | |
| | JST 00:30 | | |(parse dt) | |shot | |(poll until done)| | |
| +---------------------+ | +-----------+ +------------+ +--------+--------+ | |
| | | | |
| | v | |
| | +-----------------+ | |
| | | StartExportTask | | |
| | | (start S3 export)| | |
| | +--------+--------+ | |
| +-------------------------------------------+---------------+ |
| | |
| +---------------------+ | |
| | RDS Aurora | v |
| | | +------------------------------------------------------+ |
| | +---------------+ | | RDS Export Task (async, 15-30 min) | |
| | | Database |------->| Snapshot -> Parquet conversion -> S3 output | |
| | +---------------+ | | (KMS encrypted) | |
| +---------------------+ +----------------------------------+-------------------+ |
| | |
| v |
| +------------------------------------------------------+ |
| | S3 Bucket | |
| | exports/year=YYYY/month=MM/date=DD/.../*.parquet | |
| +----------------------------------+-------------------+ |
| | |
+---------------------------------------------------------------------------------------------+
|
| Storage Transfer Service
| (hourly)
v
+---------------------------------------------------------------------------------------------+
| GCP |
| |
| +------------------------------------------------------+ |
| | GCS Bucket | |
| | exports/ (raw data copied from S3) | |
| | reorganized/ (reorganized for BigQuery) | |
| +----------------------------------+-------------------+ |
| | |
| Eventarc (object.finalized) |
| v |
| +------------------------------------------------------+ |
| | Cloud Functions (rds2bq-reorganizer) | |
| | Copy: exports/ -> reorganized/ | |
| +----------------------------------+-------------------+ |
| | |
| v |
| +------------------------------------------------------+ |
| | BigQuery (External Tables) | |
| | -> references reorganized/.../*.parquet | |
| +------------------------------------------------------+ |
| |
+---------------------------------------------------------------------------------------------+
Estimated Processing Time
Measurements from my test environment (1 GiB RDS Snapshot):
| Process | Duration |
|---|---|
| RDS Snapshot creation | ~3 min |
| S3 Export Task queue wait | ~15 min |
| S3 Export Task execution | ~15 min |
| S3 → GCS transfer | ~30 sec |
| Total | ~30-35 min |
Key Components
| Component | Role |
|---|---|
| AWS KMS | Encryption key for RDS Export |
| S3 Bucket | Export destination (TLS/KMS required, auto-delete after 30 days) |
| Step Functions | Orchestration of snapshot creation → Export |
| EventBridge Scheduler | Daily scheduler (JST 00:30) |
| Storage Transfer Service | Periodic S3 → GCS transfer (hourly) |
| Cloud Functions | Parquet file path reorganization |
| BigQuery | Reference Parquet as external tables |
Prerequisites
- Permissions to create resources with Terraform in both AWS and GCP
- An existing RDS Aurora cluster
- Cloud Functions source code (described later) placed in
./functions/rds2bq-reorganizer/
Directory Structure
.
├── main.tf # Terraform code from this article
└── functions/
└── rds2bq-reorganizer/
├── main.go # Cloud Functions source code
└── go.mod
Variable Definitions
variable "gcp_project_id" {
description = "GCP Project ID"
type = string
}
variable "aws_region" {
description = "AWS Region"
type = string
default = "ap-northeast-1"
}
variable "gcp_region" {
description = "GCP Region"
type = string
default = "asia-northeast1"
}
variable "rds_cluster_identifier" {
description = "RDS Aurora cluster identifier"
type = string
}
variable "database_name" {
description = "Database name to transfer"
type = string
}
variable "table_names" {
description = "List of table names to transfer"
type = list(string)
}
Example terraform.tfvars:
gcp_project_id = "my-gcp-project"
aws_region = "ap-northeast-1"
gcp_region = "asia-northeast1"
rds_cluster_identifier = "my-aurora-cluster"
database_name = "mydb"
table_names = [
"users",
"orders",
"products",
]
Data Sources
data "aws_caller_identity" "current" {}
data "google_project" "current" {
project_id = var.gcp_project_id
}
data "google_storage_transfer_project_service_account" "current" {
project = data.google_project.current.name
}
locals {
rds2bq_export_path_prefix = "exports"
}
Before using Storage Transfer Service, enable the service account with the following command:
gcloud transfer authorize --add-missing --project=${GCP_PROJECT_ID}
AWS Implementation
Creating the KMS Key
The RDS Export to S3 feature requires KMS encryption.
resource "aws_kms_key" "rds2bq" {
description = "KMS for rds2bq"
enable_key_rotation = true
}
resource "aws_kms_alias" "rds2bq" {
name = "alias/rds2bq"
target_key_id = aws_kms_key.rds2bq.key_id
}
resource "aws_kms_key_policy" "rds2bq" {
key_id = aws_kms_key.rds2bq.key_id
policy = data.aws_iam_policy_document.rds2bq_kms_key_policy.json
}
data "aws_iam_policy_document" "rds2bq_kms_key_policy" {
# Administrator permissions
statement {
sid = "AllowRootAccountAdmin"
effect = "Allow"
principals {
type = "AWS"
identifiers = ["arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"]
}
actions = ["kms:*"]
resources = ["*"]
}
# Allow CreateGrant for RDS Export service
statement {
sid = "AllowRdsExportCreateGrant"
effect = "Allow"
principals {
type = "Service"
identifiers = ["export.rds.amazonaws.com"]
}
actions = ["kms:CreateGrant", "kms:DescribeKey"]
resources = ["*"]
}
# Allow DescribeKey for Scheduler role
statement {
sid = "AllowDescribeKeyToSchedulerRole"
effect = "Allow"
principals {
type = "AWS"
identifiers = [aws_iam_role.rds2bq_rds2s3_scheduler.arn]
}
actions = ["kms:DescribeKey"]
resources = ["*"]
}
# Allow Decrypt for S3→GCS transfer role
statement {
sid = "AllowDecryptToImportRole"
effect = "Allow"
principals {
type = "AWS"
identifiers = [aws_iam_role.rds2bq_s32gcs.arn]
}
actions = ["kms:Decrypt", "kms:DescribeKey"]
resources = ["*"]
}
}
Creating the S3 Bucket
The S3 bucket for exports. The following security settings are configured:
- Completely block public access
- Require TLS
- Require encryption with specified KMS key
- Allow writes only from the Export role
- Auto-delete after 30 days
resource "aws_s3_bucket" "rds2bq" {
bucket = "rds2bq-${data.aws_caller_identity.current.account_id}-${var.aws_region}"
force_destroy = true
}
resource "aws_s3_bucket_public_access_block" "rds2bq" {
bucket = aws_s3_bucket.rds2bq.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_lifecycle_configuration" "rds2bq" {
bucket = aws_s3_bucket.rds2bq.id
rule {
id = "expire-30d"
status = "Enabled"
expiration { days = 30 }
}
}
resource "aws_s3_bucket_policy" "rds2bq" {
bucket = aws_s3_bucket.rds2bq.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyNonTLS"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [aws_s3_bucket.rds2bq.arn, "${aws_s3_bucket.rds2bq.arn}/*"]
Condition = { Bool = { "aws:SecureTransport" = "false" } }
},
{
Sid = "DenyPutWithoutSSEKMS"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
Condition = { StringNotEquals = { "s3:x-amz-server-side-encryption" = "aws:kms" } }
},
{
Sid = "DenyPutWithWrongKmsKey"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption-aws-kms-key-id" = [
aws_kms_key.rds2bq.arn,
aws_kms_key.rds2bq.key_id,
aws_kms_alias.rds2bq.arn
]
}
}
},
{
Sid = "DenyPutIfNotExportRole"
Effect = "Deny"
Principal = "*"
Action = ["s3:PutObject"]
Resource = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
Condition = {
StringNotEquals = {
"aws:PrincipalArn" = aws_iam_role.rds2bq_rds2s3_rdsexport.arn
}
}
}
]
})
}
IAM Role for RDS Export
Create a role that export.rds.amazonaws.com can AssumeRole:
resource "aws_iam_role" "rds2bq_rds2s3_rdsexport" {
name = "rds2bq-rds2s3-rdsexport"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Sid = "OnlyRdsExportServiceInThisAccount",
Effect = "Allow",
Principal = { Service = "export.rds.amazonaws.com" },
Action = "sts:AssumeRole",
Condition = { StringEquals = { "aws:SourceAccount" = data.aws_caller_identity.current.account_id } }
}]
})
}
resource "aws_iam_policy" "rds2bq_rds2s3_rdsexport" {
name = "rds2bq-rds2s3-rdsexport"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Sid = "ObjectRWUnderExportsPrefix",
Effect = "Allow",
Action = ["s3:PutObject*", "s3:GetObject*", "s3:DeleteObject*", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"],
Resource = "${aws_s3_bucket.rds2bq.arn}/${local.rds2bq_export_path_prefix}/*"
},
{
Sid = "ListBucketWithPrefix",
Effect = "Allow",
Action = ["s3:ListBucket", "s3:GetBucketLocation", "s3:ListBucketMultipartUploads"],
Resource = aws_s3_bucket.rds2bq.arn
},
{
Sid = "UseKmsKeyForSnapshotExport",
Effect = "Allow",
Action = ["kms:Encrypt", "kms:Decrypt", "kms:ReEncrypt*", "kms:GenerateDataKey*", "kms:DescribeKey"],
Resource = aws_kms_key.rds2bq.arn
}
]
})
}
resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3_rdsexport" {
role = aws_iam_role.rds2bq_rds2s3_rdsexport.name
policy_arn = aws_iam_policy.rds2bq_rds2s3_rdsexport.arn
}
Step Functions
Since RDS Export to S3 is asynchronous, we use Step Functions for orchestration.
The flow is as follows:
BuildPrefix: Generate date partition path from scheduled timeCreateSnapshot: Create DB cluster snapshotDescribeSnapshot: Poll snapshot statusStartExport: Start S3 Export Task
resource "aws_iam_role" "rds2bq_rds2s3" {
name = "rds2bq-rds2s3"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{ Effect = "Allow", Principal = { Service = "states.amazonaws.com" }, Action = "sts:AssumeRole" }]
})
}
resource "aws_iam_policy" "rds2bq_rds2s3" {
name = "rds2bq-rds2s3"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{ Sid = "StartExport", Effect = "Allow", Action = ["rds:StartExportTask"], Resource = "*" },
{ Sid = "CreateSnapshot", Effect = "Allow", Action = ["rds:CreateDBClusterSnapshot"], Resource = "*" },
{ Sid = "DescribeSnapshot", Effect = "Allow", Action = ["rds:DescribeDBClusterSnapshots"], Resource = "*" },
{ Sid = "TagSnapshot", Effect = "Allow", Action = ["rds:AddTagsToResource", "rds:ListTagsForResource", "rds:RemoveTagsFromResource"], Resource = "*" },
{
Sid = "PassExportRole",
Effect = "Allow",
Action = "iam:PassRole",
Resource = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
Condition = { StringEquals = { "iam:PassedToService" = "rds.amazonaws.com" } }
},
{ Sid = "AllowKmsCreateGrantForExport", Effect = "Allow", Action = ["kms:CreateGrant", "kms:DescribeKey"], Resource = aws_kms_key.rds2bq.arn },
{ Sid = "AllowCloudWatchLogsDelivery", Effect = "Allow", Action = ["logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups"], Resource = "*" }
]
})
}
resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3" {
role = aws_iam_role.rds2bq_rds2s3.name
policy_arn = aws_iam_policy.rds2bq_rds2s3.arn
}
resource "aws_cloudwatch_log_group" "rds2bq_rds2s3" {
name = "/aws/states/rds2bq-rds2s3"
retention_in_days = 30
}
resource "aws_sfn_state_machine" "rds2bq_rds2s3" {
name = "rds2bq-rds2s3"
role_arn = aws_iam_role.rds2bq_rds2s3.arn
definition = jsonencode({
Comment = "Build date partition prefix, create/await DB cluster snapshot, then StartExportTask",
StartAt = "BuildPrefix",
States = {
# $.scheduled_time is assumed to be ISO8601 (e.g., 2026-01-08T18:42:21Z)
BuildPrefix = {
Type = "Pass",
Parameters = {
"year.$" = "States.ArrayGetItem(States.StringSplit($.scheduled_time, '-'), 0)",
"month.$" = "States.ArrayGetItem(States.StringSplit($.scheduled_time, '-'), 1)",
"day.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 0), '-'), 2)",
"hour.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 0)",
"minute.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 1)",
"second.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($.scheduled_time, 'T'), 1), ':'), 2), '.Zz+-'), 0)"
},
ResultPath = "$.built",
Next = "CreateSnapshot"
},
CreateSnapshot = {
Type = "Task",
Resource = "arn:aws:states:::aws-sdk:rds:createDBClusterSnapshot",
Parameters = {
"DbClusterIdentifier" = var.rds_cluster_identifier,
"DbClusterSnapshotIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)"
},
ResultPath = "$.create",
Catch = [{
ErrorEquals = ["States.ALL"],
ResultPath = "$.create_error",
Next = "WaitBeforeDescribe"
}],
Next = "WaitBeforeDescribe"
},
WaitBeforeDescribe = { Type = "Wait", Seconds = 20, Next = "DescribeSnapshot" },
DescribeSnapshot = {
Type = "Task",
Resource = "arn:aws:states:::aws-sdk:rds:describeDBClusterSnapshots",
Parameters = {
"DbClusterSnapshotIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)"
},
ResultPath = "$.desc",
Catch = [{
ErrorEquals = ["States.ALL"],
ResultPath = "$.describe_error",
Next = "WaitAndRetryDescribe"
}],
Next = "CheckSnapshotStatus"
},
CheckSnapshotStatus = {
Type = "Choice",
Choices = [
{ Variable = "$.desc.DbClusterSnapshots[0].Status", StringEquals = "available", Next = "StartExport" },
{ Variable = "$.desc.DbClusterSnapshots[0].Status", StringEquals = "failed", Next = "FailSnapshot" }
],
Default = "WaitAndRetryDescribe"
},
WaitAndRetryDescribe = { Type = "Wait", Seconds = 15, Next = "DescribeSnapshot" },
StartExport = {
Type = "Task",
Resource = "arn:aws:states:::aws-sdk:rds:startExportTask",
Parameters = {
"ExportTaskIdentifier.$" = "States.Format('rds2bq-{}{}{}t{}{}{}', $.built.year, $.built.month, $.built.day, $.built.hour, $.built.minute, $.built.second)",
"SourceArn.$" = "$.desc.DbClusterSnapshots[0].DbClusterSnapshotArn",
"S3BucketName" = aws_s3_bucket.rds2bq.bucket,
"S3Prefix.$" = "States.Format('${local.rds2bq_export_path_prefix}/year={}/month={}/date={}', $.built.year, $.built.month, $.built.day)",
"IamRoleArn" = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
"KmsKeyId" = aws_kms_key.rds2bq.arn
},
End = true
},
FailSnapshot = { Type = "Fail", Error = "SnapshotFailed", Cause = "DB cluster snapshot status=failed" }
}
})
logging_configuration {
include_execution_data = true
level = "ALL"
log_destination = "${aws_cloudwatch_log_group.rds2bq_rds2s3.arn}:*"
}
}
The complexity of second.$ in BuildPrefix is due to timezone information (Z or +09:00) at the end of ISO8601. Simply splitting by : would leave extra characters at the end.
EventBridge Scheduler
Scheduler to trigger Step Functions at JST 00:30:
resource "aws_iam_role" "rds2bq_rds2s3_scheduler" {
name = "rds2bq-rds2s3-scheduler"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Effect = "Allow",
Principal = { Service = "scheduler.amazonaws.com" },
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_policy" "rds2bq_rds2s3_scheduler" {
name = "rds2bq-rds2s3-scheduler"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{ Sid = "AllowStartExportTask", Effect = "Allow", Action = ["rds:StartExportTask"], Resource = "*" },
{ Sid = "AllowStartStepFn", Effect = "Allow", Action = ["states:StartExecution"], Resource = aws_sfn_state_machine.rds2bq_rds2s3.arn },
{
Sid = "AllowPassOnlyExportRoleToRds",
Effect = "Allow",
Action = "iam:PassRole",
Resource = aws_iam_role.rds2bq_rds2s3_rdsexport.arn,
Condition = { StringEquals = { "iam:PassedToService" = "rds.amazonaws.com" } }
}
]
})
}
resource "aws_iam_role_policy_attachment" "rds2bq_rds2s3_scheduler" {
role = aws_iam_role.rds2bq_rds2s3_scheduler.name
policy_arn = aws_iam_policy.rds2bq_rds2s3_scheduler.arn
}
resource "aws_scheduler_schedule" "rds2bq_rds2s3" {
name = "rds2bq-rds2s3"
group_name = "default"
description = "Daily: Step Functions -> RDS StartExportTask"
schedule_expression = "cron(30 15 * * ? *)" # UTC 15:30 = JST 00:30
flexible_time_window { mode = "OFF" }
target {
arn = "arn:aws:scheduler:::aws-sdk:sfn:startExecution"
role_arn = aws_iam_role.rds2bq_rds2s3_scheduler.arn
# NOTE: Using heredoc because jsonencode escapes <aws.scheduler.scheduled-time>
input = <<-EOF
{
"StateMachineArn": "${aws_sfn_state_machine.rds2bq_rds2s3.arn}",
"Input": "{\"scheduled_time\":\"<aws.scheduler.scheduled-time>\"}"
}
EOF
}
}
Using jsonencode escapes <aws.scheduler.scheduled-time> to \u003c...\u003e, which prevents EventBridge Scheduler from recognizing it as a context variable. A heredoc must be used instead.
IAM Role for S3 → GCS Transfer
IAM role for Storage Transfer Service to access AWS S3. Uses Web Identity Federation to trust the GCP service account:
resource "aws_iam_role" "rds2bq_s32gcs" {
name = "rds2bq-s32gcs"
assume_role_policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [{
"Effect" : "Allow",
"Principal" : { "Federated" : "accounts.google.com" },
"Action" : "sts:AssumeRoleWithWebIdentity",
"Condition" : {
"StringEquals" : {
"accounts.google.com:sub" : data.google_storage_transfer_project_service_account.current.subject_id
}
}
}]
})
}
resource "aws_iam_policy" "rds2bq_s32gcs" {
name = "rds2bq-s32gcs"
policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [{
"Effect" : "Allow",
"Action" : ["s3:Get*", "s3:List*", "s3:Delete*"],
"Resource" : "*"
}]
})
}
resource "aws_iam_role_policy_attachment" "rds2bq_s32gcs" {
role = aws_iam_role.rds2bq_s32gcs.name
policy_arn = aws_iam_policy.rds2bq_s32gcs.arn
}
GCP Implementation
Storage Transfer Service Permissions
resource "google_project_iam_member" "storage_transfer_storage_admin" {
project = data.google_project.current.name
role = "roles/storage.admin"
member = data.google_storage_transfer_project_service_account.current.member
}
GCS Bucket and Storage Transfer Job
resource "google_storage_bucket" "rds2bq" {
name = "${data.google_project.current.name}-rds2bq"
location = var.gcp_region
uniform_bucket_level_access = true
public_access_prevention = "enforced"
lifecycle_rule {
action { type = "Delete" }
condition { age = 30 }
}
}
resource "google_storage_bucket_iam_member" "rds2bq_storage_transfer" {
bucket = google_storage_bucket.rds2bq.name
role = "roles/storage.objectAdmin"
member = data.google_storage_transfer_project_service_account.current.member
}
resource "google_storage_transfer_job" "rds2bq_s32gcs_daily" {
description = "Daily copy Parquet from AWS S3 to GCS"
project = data.google_project.current.name
transfer_spec {
aws_s3_data_source {
bucket_name = aws_s3_bucket.rds2bq.bucket
managed_private_network = true
role_arn = aws_iam_role.rds2bq_s32gcs.arn
}
gcs_data_sink {
bucket_name = google_storage_bucket.rds2bq.name
}
object_conditions {
include_prefixes = ["${local.rds2bq_export_path_prefix}/"]
}
transfer_options {
overwrite_objects_already_existing_in_sink = false
}
}
schedule {
schedule_start_date {
year = 2025
month = 1
day = 1
}
repeat_interval = "3600s" # hourly
}
}
Cloud Functions (Path Reorganization)
The output path structure from RDS Export to S3 is as follows:
exports/year=2026/month=01/date=09/rds2bq-20260109t003000/{database}/{database}.{table}/{partition}/part-00000.parquet
This is transformed to the following structure for easier reference by BigQuery external tables:
reorganized/dbcluster/{cluster_id}/database/{database}/table/{table}/part-00000.parquet
Cloud Functions Source Code
./functions/rds2bq-reorganizer/main.go:
package rds2bqreorganizer
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
"github.com/cloudevents/sdk-go/v2/event"
)
func init() {
functions.CloudEvent("ReorganizeParquet", reorganizeParquet)
}
type StorageObjectData struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
}
func reorganizeParquet(ctx context.Context, e event.Event) error {
var data StorageObjectData
if err := e.DataAs(&data); err != nil {
return fmt.Errorf("event.DataAs: %w", err)
}
// Only process .parquet files under exports/
if !strings.HasPrefix(data.Name, "exports/") || !strings.HasSuffix(data.Name, ".parquet") {
return nil
}
clusterID := os.Getenv("DB_CLUSTER_IDENTIFIER")
if clusterID == "" {
return errors.New("DB_CLUSTER_IDENTIFIER environment variable is not set")
}
newPath, err := buildNewPath(data.Name, clusterID)
if err != nil {
return nil // Skip files that cannot be parsed
}
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("storage.NewClient: %w", err)
}
defer client.Close()
src := client.Bucket(data.Bucket).Object(data.Name)
dst := client.Bucket(data.Bucket).Object(newPath)
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
return fmt.Errorf("copy failed: %w", err)
}
return nil
}
var pathPattern = regexp.MustCompile(
`^exports/year=(\d{4})/month=(\d{2})/date=(\d{2})/([^/]+)/([^/]+)/([^/]+)/([^/]+)/(.+\.parquet)$`,
)
func buildNewPath(srcPath, clusterID string) (string, error) {
matches := pathPattern.FindStringSubmatch(srcPath)
if matches == nil {
return "", fmt.Errorf("path does not match expected pattern: %s", srcPath)
}
database := matches[5]
dbTable := matches[6] // database.table format
filename := matches[8]
// Extract table name from database.table
table := strings.TrimPrefix(dbTable, database+".")
return filepath.Join(
"reorganized",
"dbcluster", clusterID,
"database", database,
"table", table,
filename,
), nil
}
./functions/rds2bq-reorganizer/go.mod:
module rds2bqreorganizer
go 1.24
require (
cloud.google.com/go/storage v1.43.0
github.com/GoogleCloudPlatform/functions-framework-go v1.8.1
github.com/cloudevents/sdk-go/v2 v2.15.2
)
Deploying Cloud Functions with Terraform
# Bucket for Cloud Functions source code
resource "google_storage_bucket" "rds2bq_functions" {
name = "${data.google_project.current.name}-rds2bq-functions"
location = var.gcp_region
uniform_bucket_level_access = true
public_access_prevention = "enforced"
}
# ZIP the source code
data "archive_file" "rds2bq_reorganizer" {
type = "zip"
source_dir = "${path.module}/functions/rds2bq-reorganizer"
output_path = "${path.module}/functions/rds2bq-reorganizer.zip"
}
resource "google_storage_bucket_object" "rds2bq_reorganizer" {
name = "rds2bq-reorganizer-${data.archive_file.rds2bq_reorganizer.output_md5}.zip"
bucket = google_storage_bucket.rds2bq_functions.name
source = data.archive_file.rds2bq_reorganizer.output_path
}
# Service account for Cloud Functions execution
resource "google_service_account" "rds2bq_reorganizer" {
account_id = "rds2bq-reorganizer"
display_name = "Cloud Function rds2bq-reorganizer"
}
resource "google_storage_bucket_iam_member" "rds2bq_reorganizer_object_admin" {
bucket = google_storage_bucket.rds2bq.name
role = "roles/storage.objectAdmin"
member = google_service_account.rds2bq_reorganizer.member
}
resource "google_project_iam_member" "rds2bq_reorganizer_event_receiver" {
project = data.google_project.current.project_id
role = "roles/eventarc.eventReceiver"
member = google_service_account.rds2bq_reorganizer.member
}
# Cloud Functions (2nd gen)
resource "google_cloudfunctions2_function" "rds2bq_reorganizer" {
name = "rds2bq-reorganizer"
location = var.gcp_region
build_config {
runtime = "go124"
entry_point = "ReorganizeParquet"
source {
storage_source {
bucket = google_storage_bucket.rds2bq_functions.name
object = google_storage_bucket_object.rds2bq_reorganizer.name
}
}
}
service_config {
max_instance_count = 10
available_memory = "256Mi"
timeout_seconds = 60
service_account_email = google_service_account.rds2bq_reorganizer.email
environment_variables = {
DB_CLUSTER_IDENTIFIER = var.rds_cluster_identifier
}
}
event_trigger {
trigger_region = var.gcp_region
event_type = "google.cloud.storage.object.v1.finalized"
service_account_email = google_service_account.rds2bq_reorganizer.email
event_filters {
attribute = "bucket"
value = google_storage_bucket.rds2bq.name
}
retry_policy = "RETRY_POLICY_RETRY"
}
}
resource "google_cloud_run_service_iam_member" "rds2bq_reorganizer_invoker" {
location = google_cloudfunctions2_function.rds2bq_reorganizer.location
service = google_cloudfunctions2_function.rds2bq_reorganizer.name
role = "roles/run.invoker"
member = google_service_account.rds2bq_reorganizer.member
}
BigQuery External Tables
Create external tables that reference the reorganized paths:
resource "google_bigquery_dataset" "rds2bq" {
dataset_id = "rds2bq_${var.database_name}"
location = var.gcp_region
delete_contents_on_destroy = false
}
resource "google_bigquery_table" "rds2bq" {
for_each = toset(var.table_names)
dataset_id = google_bigquery_dataset.rds2bq.dataset_id
table_id = each.key
deletion_protection = false
external_data_configuration {
source_format = "PARQUET"
autodetect = true
source_uris = [
"gs://${google_storage_bucket.rds2bq.name}/reorganized/dbcluster/${var.rds_cluster_identifier}/database/${var.database_name}/table/${each.key}/*.parquet"
]
}
}
Using for_each, external tables can be created in bulk from the list of table names.
Additional Notes
- RDS Export to S3 is asynchronous with no completion notification, so Step Functions polls for snapshot completion before starting the Export
- Storage Transfer Service runs hourly. Export completion timing varies slightly day by day, so we allow some buffer
- Cloud Functions is triggered by Eventarc’s
object.finalizedevent whenever a Parquet file is created - BigQuery external tables read from GCS on every query, so loading to native tables may be preferable for frequent queries
- S3 has a 30-day lifecycle policy, GCS has a 30-day lifecycle policy for automatic deletion