Using streams in python for highly resource efficient ingestion in ETL pipelines
how to not waste your memory, disk and cpu one step at a time.
Whenever we’re designing pipelines that are processing data there are tradeoffs and bottlenecks that occur in the process. Say we are loading a large file, if we load that filly wholly in memory we will be constrained by the maximum memory capacity of the computing node that we’re working with.
As data engineers, we may be familiar with various ways to remove certain constraints. We may process data in chunks, process data in parallel, process it asynchronously, slap a data orchestrator on it and maybe even go crazy with auto-scaling policies in kubernetes.
Today, I want to talk about a ELT/ETL pattern that is often overlooked but can be quite powerful and flexible when implented properly. This pattern, is streaming.
….wait but I know what streaming is, isn’t that about data latency and using fancy frameworks like Kafka to process my data? Yes, that is a form of streaming but not the form this article will dive deeper into.
In this article, we wil dive deeper into various kind of streaming data types in python and how we can configure them to stream data from various sources to various destinations.
What is a “stream” and what does it look like?
For the purposes of this article, we will maintain the following loose definition of a “stream”
✍🏻 A stream is a sequence of potentially unlimited chunks of data made available over time.
A stream can be thought of as items on a conveyor belt being processed one at a time rather than all at once.
So essentially, we want to have a data type that can do two things:
Define a sequence of events to process one at a time
Allow to read one chunk at a time
Oftentimes you will see this being defined as “lazy” computing and it’s very common across many big data frameworks such as dask and spark. “Lazy” here refers to the fact that the chunk of data is only loaded and processed when it is asked to do so rather than precomputing everything up front and then processing each element individually.
As a data engineer, you’ve definitely used these sorts of datatypes before.
Let’s say you’ve loaded a CSV file with pandas before with chunking turned on, well, pandas will use a streaming like object in the background to facilitate this!
import pandas as pd | |
# Example of reading a CSV file in chunks with Pandas | |
chunk_size = 1000 | |
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size): | |
process(chunk) # Replace with your processing logic |
well…to be fair, if we are talking stritcly canonical python, you may be aware that we are specifically talking about iterators
.
Now you may say: “Hey but I already know those, i’m outa here”.
Yes, you may be aware of iterators but there’s a suprising amount of ways that you can leverage them to stream data from and to sources in ways you may not expect.
If you don’t know what iterators
are, the following section is for you.
A way to iterate and iterate and iterate
Feel free to skip this section if you are familiar enough with iterators
.
An iterable
is any Python object capable of returning its elements one at a time, allowing it to be iterated over in a loop. Examples include lists, tuples, sets, dictionaries, and strings. An object is considered iterable if it implements the __iter__()
method, which returns an iterator.
my_list = [1, 2, 3] | |
for item in my_list: | |
print(item) |
An iterator
is an object that represents a stream of data. It returns data one element at a time when the __next__()
method is called. Iterators keep track of their current position. An iterator is created by calling the __iter__()
method on an iterable, which returns the iterator object itself.
my_list = [1, 2, 3] | |
iterator = iter(my_list) | |
print(next(iterator)) # Output: 1 | |
print(next(iterator)) # Output: 2 | |
print(next(iterator)) # Output: 3 | |
# Calling next again will raise a StopIteration Exception |
The Abstract Base Class (ABC) iterator
protocol defines two methods that an object must implement to be considered an iterator:
__iter__()
returns the iterator object itself.__next__()
returns the next item in the sequence. If there are no more items, it raisesStopIteration
.
class MyIterator: | |
def __init__(self, data): | |
self.data = data | |
self.index = 0 | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self.index < len(self.data): | |
item = self.data[self.index] | |
self.index += 1 | |
return item | |
else: | |
raise StopIteration | |
my_iter = MyIterator([1, 2, 3]) | |
for item in my_iter: | |
print(item) |
A generator is a special type of iterator created using a function that uses the yield
statement to return data one item at a time. Generators provide a convenient way to implement iterators without needing to write a full class with __iter__()
and __next__()
methods.
def my_generator(): | |
yield 1 | |
yield 2 | |
yield 3 | |
gen = my_generator() | |
for item in gen: | |
print(item) |
A for loop in Python can be used to iterate over an iterable object, executing the loop body for each item.
The yield
statement is used in a generator function to return data without terminating the function. The function state is saved, allowing the generator to resume where it left off when next()
is called again.
def count_up_to(max): | |
count = 1 | |
while count <= max: | |
yield count | |
count += 1 | |
counter = count_up_to(3) | |
print(next(counter)) # Output: 1 | |
print(next(counter)) # Output: 2 | |
print(next(counter)) # Output: 3 | |
# Calling next again will raise StopIteration |
By understanding these components, you can effectively use and create iterators and generators in Python to handle sequences of data efficiently.
Using iterators for streaming data from various sources
In this section we will go over a few examples of common sources you may want to load from and to as a data engineer.
Streaming data from an API with HTTPX.stream
To stream data from an API, you can use the httpx
library. This is particularly useful for downloading large files or handling continuous data feeds.
import httpx | |
def stream_data_from_api(url, chunk_size=1024): | |
with httpx.stream('GET', url) as response: | |
for chunk in response.iter_bytes(chunk_size): | |
yield chunk | |
# Example usage | |
url = '<https://example.com/large_file.zip>' | |
for chunk in stream_data_from_api(url): | |
# Process each chunk | |
print(chunk) |
Streaming Data to and from S3 with S3FS
For handling data in Amazon S3, the s3fs
library allows you to stream data directly from S3 buckets without downloading the entire file.
import s3fs | |
def stream_data_from_s3(bucket_name, file_path, chunk_size=1024): | |
fs = s3fs.S3FileSystem() | |
with fs.open(f'{bucket_name}/{file_path}', 'rb') as file: | |
while chunk := file.read(chunk_size): | |
yield chunk | |
# Example usage | |
bucket_name = 'my-bucket' | |
file_path = 'large_file.csv' | |
for chunk in stream_data_from_s3(bucket_name, file_path): | |
# Process each chunk | |
print(chunk) |
Streaming Data to a Data Warehouse with Snowflake Connector
When streaming data to a data warehouse like Snowflake, I have a separate article going in depth in to how to use the snowflake connector to do so. Here I will illustrate a few ways in which we can stream data with the snowflake connector.
When retrieving data, we can retrieve an iterator of pandas dataframes for examples. With each iteration, the connector will download the next chunk of results data:
cursor = connection.cursor() | |
results = cursor.fetch_pandas_batches() | |
assert isinstance(results, Generator) | |
for result in results: | |
assert isinstance(result, PandasDataFrame) | |
break |
It’s also possible to stream data TO snowflake by supplying it with a file stream object like so:
with connection: | |
if file_path is not None: | |
counter = 0 | |
cursor_execution_config.command = f"PUT file://{snowflake_file_name}_{counter} @{full_qualified_stage_name} OVERWRITE = {overwrite}" | |
with open(file_path, "rb") as f: | |
if chunk_size == 0: | |
raise ValueError( | |
"Chunk size of 0 is not allowed for filestream pushing." | |
) | |
elif chunk_size > 0: | |
lines = [] | |
for ( | |
line | |
) in ( | |
f | |
): # NOTE: This only works for file types that are splittable line by line like CSVs. | |
lines.append(line) | |
if len(lines) >= chunk_size: | |
counter += 1 | |
filestream = io.BytesIO(b"".join(lines)) | |
cursor = cursor.execute( | |
*cursor_execution_config.dict(), | |
file_stream=filestream, | |
) | |
lines = [] | |
if lines: | |
counter += 1 | |
filestream = io.BytesIO(b"".join(lines)) | |
cursor = cursor.execute( | |
**cursor_execution_config.dict(), | |
file_stream=filestream, | |
) | |
debug(f"Final chunk of {len(lines)} lines pushed.") | |
else: | |
cursor = cursor.execute(**cursor_execution_config.dict()) | |
if isinstance(cursor, SnowflakeCursor): | |
# SnowflakeExecuteEvent(vars(self.cursor)).log() | |
logger.info(f"cursor is {cursor}") | |
else: | |
logger.info(f"SQL execution results is {cursor}") | |
return self |
Streaming Data with SFTP using Paramiko
The paramiko
library allows you to stream data to and from an SFTP server, useful for transferring large files securely.
pythonCopy code | |
import paramiko | |
def stream_data_from_sftp(hostname, port, username, password, remote_path, chunk_size=1024): | |
transport = paramiko.Transport((hostname, port)) | |
transport.connect(username=username, password=password) | |
sftp = paramiko.SFTPClient.from_transport(transport) | |
with sftp.open(remote_path, 'rb') as file: | |
while chunk := file.read(chunk_size): | |
yield chunk | |
sftp.close() | |
transport.close() | |
# Example usage | |
hostname = 'sftp.example.com' | |
port = 22 | |
username = 'user' | |
password = 'pass' | |
remote_path = '/path/to/large_file' | |
for chunk in stream_data_from_sftp(hostname, port, username, password, remote_path): | |
# Process each chunk | |
print(chunk) |
By using these libraries and methods, you can effectively implement streaming data interfaces to handle large datasets efficiently across various platforms and services.