Tools
Tools: Boosting Lightweight ETL on AWS Lambda & Glue Python Shell with DuckDB and Apache Arrow Dataset
2026-03-06
0 views
admin
Introduction ## Recap of Previous Articles ## 48MB File (1 Month) ## 807MB File (1 Year) ## What Caused the DuckDB Bottleneck? ## What is Apache Arrow Dataset? ## Architecture Overview ## AWS Lambda ## Glue Python Shell ## Sample Code (AWS Lambda) ## Sample Code (Glue Python Shell) ## Benchmarking ## AWS Lambda ## Glue Python Shell (1 DPU) ## Memory Consideration with Arrow Dataset ## Trade-Offs ## Pushdown with Arrow Dataset ## Experimenting in Lambda ## Insights ## Responsibility-Separated Architecture ## Conclusion Original Japanese article: AWS Lambda/Glue Python Shell×DuckDBの軽量ETLをApache Arrow Datasetで高速化してみた I'm Aki, an AWS Community Builder (@jitepengin). In my previous articles, I introduced lightweight ETL using AWS Lambda and Glue Python Shell.
In the process, I found that DuckDB's performance was not as high as expected: Does Increasing AWS Lambda Memory to 10GB Really Make It Faster? (AWS Lambda chDB/DuckDB PyIceberg Benchmark)
AWS Lambda and AWS Glue Python Shell in the Context of Lightweight ETL In this article, I will cover what became the bottleneck for DuckDB and how using Apache Arrow Dataset can improve performance, along with the trade-offs observed. Does Increasing AWS Lambda Memory to 10GB Really Make It Faster? (AWS Lambda chDB/DuckDB PyIceberg Benchmark)
AWS Lambda and AWS Glue Python Shell in the Context of Lightweight ETL Using NYC taxi data, we compared performance on the same file:
data.page]https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page Lambda measurements were taken with memory configurations of 1024MB, 2048MB, and the maximum 3008MB (without quota increase).
Glue Python Shell tests were performed with DPU settings of 1/16 and 1. Since memory usage cannot be directly compared, we focus only on execution time. When loading Parquet directly into DuckDB, the flow is typically: Even if the query itself is light, S3 scanning becomes the bottleneck, lowering DuckDB’s standalone performance.
Measurements showed that in Glue Python Shell, of the total 210 seconds, 176 seconds (~83%) were spent in S3 Scan + Parquet Decode. Solution: Use Apache Arrow Dataset to separate reading from querying and improve performance. https://arrow.apache.org/docs/python/dataset.html Apache Arrow Dataset is a library for efficiently reading Parquet or CSV files using the columnar Arrow in-memory format. By leveraging these features, the S3 Scan + Parquet Decode bottleneck can be greatly reduced. Same architecture as in previous articles: Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted. Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted. Using the same dataset and queries as in previous articles: 48MB File (1 Month) memory=3008MB 807MB File (1 Year) memory=10240MB As a result, both AWS Lambda and Glue Python Shell were able to achieve significant performance improvements compared to chDB.
In other words, addressing the S3 scan and Parquet decoding bottlenecks seems to be the key to improving DuckDB processing. However, in the case of Lambda, large file sizes can lead to high memory usage, potentially exceeding the memory limits.
This means that careful consideration of where and how to use this approach is necessary. In this process, dataset.to_table() materializes the entire dataset in memory as an Arrow Table.
Arrow Tables use a columnar in-memory format, which is very fast, but in this case, loading the entire file at once can result in high memory usage. For example, reading an 807MB Parquet file in Lambda can cause the memory footprint of the Arrow Table to be much larger than the compressed Parquet file size.
While to_table() is convenient, it is important to be aware that it can significantly increase memory consumption depending on the processing. Lambda can also use /tmp for disk-backed processing, but processing in memory is overwhelmingly faster.
However, due to memory limits, expanding a large file into an Arrow Table can quickly consume a large amount of memory. For simple queries, one approach is to iterate over row groups instead of materializing the entire table in memory, processing small chunks at a time.
This method can potentially keep memory usage within a few hundred MBs. Using Arrow Dataset significantly improves the speed of S3 reads and Parquet decoding.
However, expanding the dataset all at once with to_table() increases memory usage, which may hit Lambda’s memory limits. Therefore, it is important to design your ETL with a balance between performance and memory usage in mind.
For small files or when Lambda has sufficient memory, loading the full dataset at once is fine.
For larger files, consider chunked processing by row group or pushdown filters to keep memory usage under control. Using Arrow Dataset’s Filter/Projection Pushdown, you can load only the row groups you need from S3. Here’s how you can apply it: You can integrate pushdown into your existing Lambda code. For example: Using pushdown, memory usage was reduced significantly: The memory usage was significantly reduced with pushdown. This two-step approach filtering unnecessary data with Arrow Dataset before passing it to DuckDB proves to be effective for large datasets. In this article, we explored performance improvements for a lightweight ETL built with AWS Lambda / Glue Python Shell × DuckDB × PyIceberg. For lightweight ETL, especially on AWS Lambda, processing time is a critical factor. By using Apache Arrow Dataset, we were able to significantly improve performance by offloading S3 reading and Parquet decoding before running queries in DuckDB. However, there are trade-offs. Expanding an entire dataset into memory with to_table() can lead to high memory usage, which may exceed Lambda’s limits for large files. Therefore, careful responsibility separation and chunked processing (e.g., row group iteration or pushdown filters) are important considerations. With the architecture presented here, even large files can be processed quickly in a lightweight ETL on AWS. While complex queries may still face performance limitations, this approach provides a practical and efficient option for real-time or near-real-time ETL in a Lakehouse environment using Apache Iceberg. We hope this article serves as a reference for those exploring lightweight data processing and ETL patterns on Iceberg tables. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
S3
↓
DuckDB read_parquet
↓
Filter / Query Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
S3
↓
DuckDB read_parquet
↓
Filter / Query CODE_BLOCK:
S3
↓
DuckDB read_parquet
↓
Filter / Query COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table() print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel WHERE VendorID = 1 """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table() print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel WHERE VendorID = 1 """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table() print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel WHERE VendorID = 1 """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") COMMAND_BLOCK:
import boto3
import sys
import os
import pyarrow.dataset as ds
import pyarrow.fs as fs
from awsglue.utils import getResolvedOptions def get_job_parameters(): args = getResolvedOptions(sys.argv, ['s3_input']) s3_path = args['s3_input'] print(f"object: {s3_path}") return s3_path def setup_duckdb_environment(): try: duckdb_dir = '/tmp/.duckdb' os.environ['HOME'] = '/tmp' os.makedirs(duckdb_dir, exist_ok=True) print(f"DuckDB environment setup completed: {duckdb_dir}") return True except Exception as e: print(f"DuckDB environment setup error: {e}") return False def read_parquet_with_arrow_dataset(s3_input): print("Reading with Arrow Dataset...") session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) path = s3_input.replace("s3://", "") dataset = ds.dataset( path, filesystem=s3, format="parquet" ) table = dataset.to_table() print(f"Arrow read rows: {table.num_rows}") return table def process_with_duckdb(arrow_table): import duckdb con = duckdb.connect(":memory:") try: rel = duckdb.from_arrow(arrow_table) result = con.execute(""" SELECT * FROM rel WHERE VendorID = 1 """).arrow() print(f"DuckDB filtered rows: {result.num_rows}") return result finally: con.close() def write_iceberg_table(arrow_table): try: print("Writing started...") from pyiceberg.catalog import load_catalog catalog_config = { "type": "glue", "warehouse": "s3://your-bucket/your-warehouse/", "region": "ap-northeast-1" } catalog = load_catalog("glue_catalog", **catalog_config) table_identifier = "icebergdb.yellow_tripdata" table = catalog.load_table(table_identifier) print(f"Target data to write: {arrow_table.num_rows:,} rows") table.append(arrow_table) return True except Exception as e: print(f"Writing error: {e}") import traceback traceback.print_exc() return False def main(): if not setup_duckdb_environment(): print("Failed to set up DuckDB environment") return try: s3_input = get_job_parameters() # Arrow Dataset read arrow_tbl = read_parquet_with_arrow_dataset(s3_input) # DuckDB SQL filter result_tbl = process_with_duckdb(arrow_tbl) # Iceberg write if write_iceberg_table(result_tbl): print("Writing fully successful!") else: print("Writing failed") except Exception as e: print(f"Main error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import boto3
import sys
import os
import pyarrow.dataset as ds
import pyarrow.fs as fs
from awsglue.utils import getResolvedOptions def get_job_parameters(): args = getResolvedOptions(sys.argv, ['s3_input']) s3_path = args['s3_input'] print(f"object: {s3_path}") return s3_path def setup_duckdb_environment(): try: duckdb_dir = '/tmp/.duckdb' os.environ['HOME'] = '/tmp' os.makedirs(duckdb_dir, exist_ok=True) print(f"DuckDB environment setup completed: {duckdb_dir}") return True except Exception as e: print(f"DuckDB environment setup error: {e}") return False def read_parquet_with_arrow_dataset(s3_input): print("Reading with Arrow Dataset...") session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) path = s3_input.replace("s3://", "") dataset = ds.dataset( path, filesystem=s3, format="parquet" ) table = dataset.to_table() print(f"Arrow read rows: {table.num_rows}") return table def process_with_duckdb(arrow_table): import duckdb con = duckdb.connect(":memory:") try: rel = duckdb.from_arrow(arrow_table) result = con.execute(""" SELECT * FROM rel WHERE VendorID = 1 """).arrow() print(f"DuckDB filtered rows: {result.num_rows}") return result finally: con.close() def write_iceberg_table(arrow_table): try: print("Writing started...") from pyiceberg.catalog import load_catalog catalog_config = { "type": "glue", "warehouse": "s3://your-bucket/your-warehouse/", "region": "ap-northeast-1" } catalog = load_catalog("glue_catalog", **catalog_config) table_identifier = "icebergdb.yellow_tripdata" table = catalog.load_table(table_identifier) print(f"Target data to write: {arrow_table.num_rows:,} rows") table.append(arrow_table) return True except Exception as e: print(f"Writing error: {e}") import traceback traceback.print_exc() return False def main(): if not setup_duckdb_environment(): print("Failed to set up DuckDB environment") return try: s3_input = get_job_parameters() # Arrow Dataset read arrow_tbl = read_parquet_with_arrow_dataset(s3_input) # DuckDB SQL filter result_tbl = process_with_duckdb(arrow_tbl) # Iceberg write if write_iceberg_table(result_tbl): print("Writing fully successful!") else: print("Writing failed") except Exception as e: print(f"Main error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() COMMAND_BLOCK:
import boto3
import sys
import os
import pyarrow.dataset as ds
import pyarrow.fs as fs
from awsglue.utils import getResolvedOptions def get_job_parameters(): args = getResolvedOptions(sys.argv, ['s3_input']) s3_path = args['s3_input'] print(f"object: {s3_path}") return s3_path def setup_duckdb_environment(): try: duckdb_dir = '/tmp/.duckdb' os.environ['HOME'] = '/tmp' os.makedirs(duckdb_dir, exist_ok=True) print(f"DuckDB environment setup completed: {duckdb_dir}") return True except Exception as e: print(f"DuckDB environment setup error: {e}") return False def read_parquet_with_arrow_dataset(s3_input): print("Reading with Arrow Dataset...") session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) path = s3_input.replace("s3://", "") dataset = ds.dataset( path, filesystem=s3, format="parquet" ) table = dataset.to_table() print(f"Arrow read rows: {table.num_rows}") return table def process_with_duckdb(arrow_table): import duckdb con = duckdb.connect(":memory:") try: rel = duckdb.from_arrow(arrow_table) result = con.execute(""" SELECT * FROM rel WHERE VendorID = 1 """).arrow() print(f"DuckDB filtered rows: {result.num_rows}") return result finally: con.close() def write_iceberg_table(arrow_table): try: print("Writing started...") from pyiceberg.catalog import load_catalog catalog_config = { "type": "glue", "warehouse": "s3://your-bucket/your-warehouse/", "region": "ap-northeast-1" } catalog = load_catalog("glue_catalog", **catalog_config) table_identifier = "icebergdb.yellow_tripdata" table = catalog.load_table(table_identifier) print(f"Target data to write: {arrow_table.num_rows:,} rows") table.append(arrow_table) return True except Exception as e: print(f"Writing error: {e}") import traceback traceback.print_exc() return False def main(): if not setup_duckdb_environment(): print("Failed to set up DuckDB environment") return try: s3_input = get_job_parameters() # Arrow Dataset read arrow_tbl = read_parquet_with_arrow_dataset(s3_input) # DuckDB SQL filter result_tbl = process_with_duckdb(arrow_tbl) # Iceberg write if write_iceberg_table(result_tbl): print("Writing fully successful!") else: print("Writing failed") except Exception as e: print(f"Main error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() CODE_BLOCK:
dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet"
) arrow_table = dataset.to_table() Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet"
) arrow_table = dataset.to_table() CODE_BLOCK:
dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet"
) arrow_table = dataset.to_table() CODE_BLOCK:
arrow_table = dataset.to_table( columns=["VendorID", "tpep_pickup_datetime"], filter=ds.field("VendorID") == 1
) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
arrow_table = dataset.to_table( columns=["VendorID", "tpep_pickup_datetime"], filter=ds.field("VendorID") == 1
) CODE_BLOCK:
arrow_table = dataset.to_table( columns=["VendorID", "tpep_pickup_datetime"], filter=ds.field("VendorID") == 1
) COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table(filter=ds.field("VendorID") == 1) print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table(filter=ds.field("VendorID") == 1) print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") COMMAND_BLOCK:
import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog def lambda_handler(event, context): try: # DuckDB setup in Lambda duckdb_connection = duckdb.connect(database=':memory:') # Retrieve S3 path from event s3_bucket = event['Records'][0]['s3']['bucket']['name'] s3_object_key = event['Records'][0]['s3']['object']['key'] s3_input_path = f"{s3_bucket}/{s3_object_key}" print(f"S3 input path: {s3_input_path}") # Read Parquet from S3 using Arrow Dataset # Use boto3 session to get temporary credentials session = boto3.Session() credentials = session.get_credentials().get_frozen_credentials() s3 = fs.S3FileSystem( region="ap-northeast-1", access_key=credentials.access_key, secret_key=credentials.secret_key, session_token=credentials.token ) # Load dataset with Arrow Dataset dataset = ds.dataset( s3_input_path, filesystem=s3, format="parquet" ) # Convert dataset to Arrow Table (in-memory) arrow_table = dataset.to_table(filter=ds.field("VendorID") == 1) print(f"Number of rows retrieved: {arrow_table.num_rows}") print(f"Schema: {arrow_table.schema}") # DuckDB processing (SQL query) # Use DuckDB from_arrow to run SQL on Arrow rel = duckdb_connection.from_arrow(arrow_table) result_arrow_table = duckdb_connection.execute( """ SELECT * FROM rel """ ).fetch_arrow_table() # Configure Glue Catalog (to access Iceberg table) catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # Adjust to your environment. # Load the table namespace = "icebergdb" # Adjust to your environment. table_name = "yellow_tripdata" # Adjust to your environment. iceberg_table = catalog.load_table(f"{namespace}.{table_name}") # Append data to the Iceberg table in bulk iceberg_table.append(result_arrow_table) print("Data has been appended to S3 in Iceberg format.") except Exception as e: print(f"An error occurred: {e}") CODE_BLOCK:
S3 Parquet (raw data) │ ▼
Arrow Dataset → row group scan + simple filter │ ▼
DuckDB → SQL query (JOIN, GROUP BY, Window functions) │ ▼
PyIceberg → Iceberg table write Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
S3 Parquet (raw data) │ ▼
Arrow Dataset → row group scan + simple filter │ ▼
DuckDB → SQL query (JOIN, GROUP BY, Window functions) │ ▼
PyIceberg → Iceberg table write CODE_BLOCK:
S3 Parquet (raw data) │ ▼
Arrow Dataset → row group scan + simple filter │ ▼
DuckDB → SQL query (JOIN, GROUP BY, Window functions) │ ▼
PyIceberg → Iceberg table write - January 2024 Yellow Taxi Trip Records (2,964,624 records, 48MB)
- Full-year 2024 Yellow Taxi Trip Records (41,169,720 records, 807MB) - S3 Scan: Reading the entire dataset involves heavy network I/O — this can take most of the time.
- Parquet Decode: Decoding inside DuckDB adds CPU load.
- Query Processing: For simple filters like WHERE VendorID = 1, query time is minimal. - Fast Parquet reading
- Efficient decode operations
- Parallelized S3 reads
- Filter/Projection pushdown to reduce I/O - Arrow Dataset separates S3 reading from DuckDB querying
- Light filters can be applied in Arrow Dataset alone for speed - Glue Python Shell can execute ETL in the same Lambda-style configuration
- Responsibilities separated: Arrow Dataset for reading, DuckDB for SQL query
- Lightweight filters can be processed efficiently - Pros: Decoding and I/O are faster, resulting in improved performance.
- Cons: Materializing the entire file consumes a lot of memory, and for large files, Lambda may run into OutOfMemory errors. - Only the necessary row groups are read (this is crucial for large datasets).
- Reduces network I/O from S3.
- Can further shorten processing time.
- Arrow Dataset is optimal for lightweight reads and simple filters; complex queries should still be handled in DuckDB. - DuckDB alone is slow due to S3 Scan + Parquet Decode
- DuckDB shines with complex queries (JOIN, GROUP BY, WINDOW)
- Pushdown is key in Arrow Dataset
- Separating responsibilities (Arrow Dataset + DuckDB) enables efficient ETL in Lambda/Glue
- chDB offers balanced memory and speed out-of-the-box
how-totutorialguidedev.toaimlshellnetworkapachepythondatabase