🍩 Doughnut Reader 🍩

The Birth of Parquet

sympathetic.ink

whinvik

79 comments

13 days ago

Fun story time: I was at twitter for a few years and tended to write quick hacks that people wanted to replace with better engineering.

We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.

I also wrote one era of twitter's service deploy system on a hacked up Capistrano.

These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.

Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.

I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.

fizx

12 days ago

Hahaha. Don't ask how the sausage gets made, so long as the sausage machine is readily maintainable by more than one person or has documentation.

Partially explains how murder came to be https://github.com/lg/murder

> below perf review

That's some cheap bullshit. Fuck marketing-oriented corporate engineering.

hi-v-rocknroll

10 days ago

Tangential to the topic but regarding the supposed Snowball Effect there is in real life no such thing. I have pushed large 'snowballs' down slopes --in reality they are snow cylinders as shown in the photo-- and they invariably do not get far. The reason being that when one side of the cylinder randomly thickens slightly with respect to the other side this causes the whole thing to turn in the opposite direction.

For example, if the RHS of your cylinder has a slightly larger radius than the LHS the cylinder will commence turning to the left.

The upshot is the thick side picks up more snow than the thin side and the disparity in radii increases more rapidly still. The cylinder becomes a truncated cone which turns sideways and halts!

colloydi

11 days ago

It is highly dependent on the snow conditions and the recent weather. Sometimes even just the a couple hours are enough to change the conditions to have a good chance of rollerballs. The climate also has an impact, in my experience more coastal areas have more periods when they form.

And in some cases the rollerballs get too tall for the bonding strength of the snow, so they break into parts that can restart the cycle if the slope is steep enough.

dendrite9

11 days ago

Sounds to me like there's a long blog post waiting to be written.

whinvik

11 days ago

Reading through this blog, to me it seems Parquet is lot like ClickHouse native data format.

Best part of ClickHouse native data format is I can use the same ClickHouse queries and can run in local or remote server/cluster and let ClickHouse to decide the available resources in the most performant way.

ClickHouse has a native and the fastest integration with Parquet so i can:

- Query local/s3 parquet data from command line using clickhouse-local.

- Query large amount of local/s3 data programmatically by offloading it to clickhouse server/cluster which can do processing in distributed fashion.

pradeepchhetri

12 days ago

I've been struggeling with a tough parquet problem for a few months now.

I have a 15gb parquet file in a s3 bucket and I need to "unzip" and extract every row from the file to write into my database. The contents of the file are emails and I need to integrate them into our search function.

Is this possible to do without an unreasonable amount of RAM? Are there any affordable services that can help here?

Feel free to contact me (email in bio), happy to pay for a consult at the minimum.

calderwoodra

12 days ago

DuckDB?

https://duckdb.org/2024/03/29/external-aggregation.html

https://duckdb.org/2021/06/25/querying-parquet.html

If your DB is mysql or postgres, then you could read a stream from parquet, transform inline and write out to your DB

https://duckdb.org/2024/01/26/multi-database-support-in-duck...

And an unrelated, but interesting read about the parquet bomb

https://duckdb.org/2024/03/26/42-parquet-a-zip-bomb-for-the-...

chrisjc

12 days ago

I work with pyspark and parquet quite a lot. I never had to deal with parquet outside spark, but this is how I would do this:

- Write a pandas_udf function in pyspark.

- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.

Something like:

```

from pyspark.sql import SparkSession

import pyspark.sql.functions as f

@f.pandas_udf(return_type=whatever)

def ingest(doc: pd.Series): # doc is a pandas series now

    # your processing goes here -> write to DB e.t.c

    pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy

    return pd_series_literal
spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("s3 path")

df = df.repartition(1000). # bump up this number if you run into memory issues

df = df.withColumn("foo", ingest(f.col("doc_column"))

```

Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.

^ Probably overkill to bring spark into the equation, but this is one way to do it.

You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization

lonesword

12 days ago

Pyspark is probably the way to go.

I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.

It is trivial to map the file into Athena.

But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).

The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.

fifilura

12 days ago

Athena is probably my best bet tbh, especially if I can do a few clicks and just get smaller files. Processing smaller files is a no brainer / pretty easy and could be outsourced to lambda.

calderwoodra

12 days ago

Yeah the big benefit is that it requires very little setup.

You create a new partitioned table/location from the originally mapped file using a CTAS like so:

  CREATE TABLE new_table_name
  WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    external_location = 's3://your-bucket/path/to/output/'
  ) AS
  SELECT *
  FROM original_table_name
  PARTITIONED BY partition_column_name
You can probably create a hash and partition by the last character if you want 16 evenly sized partitions. Unless you already have a dimension to partition by.

fifilura

12 days ago

It's been a while (~5yr) since I've done anything with Spark, but IIRC it used to be very difficult to make reliable jobs with the Java or Python APIs due to the impedance mismatch between Scala's lazy evaluation semantics and the eager evaluation of Java and Python. I'd encounter perplexing OOMs whenever I tried to use the Python or Java APIs, so I (reluctantly) learned enough Scala to make the Spark go brr and all was well. Is it still like this?

jcgrillo

12 days ago

Same for me, the only reason to learn scala was Spark. The Java Api was messy. And still today, i like Scala, well, many functional languages, but for jumping between projects they are a nightmare, as everything is dense and cluttered.

okr

9 days ago

I can help! Emailing you now :)

Our company (scratchdata.com, open source) is literally built to solve the problem of schlepping large amounts of data between sources and destinations, so I have worked on this problem a lot personally and happy to nerd out about what works.

memset

12 days ago

I - by my HPC-background - am wondering quite a bit what happened that 15GB-files are considered large data? Not being a crazy parquet-user, but:

- does this decompress to giant sizes? - can't you split the file easily, because it includes row-based segments? - why does it take months to solve this for one file?

fock

12 days ago

As a fellow HPC user, I tried a couple of years ago to do a tricky data format conversion using these newfangled tools. I was essentially just taking a huge (multi-terabyte) 3D dataset, transposing it and changing the endianness.

The solutions I was able to put together using Dask and Spark and such were all insanely slow, they just got killed by Slurm without getting anywhere. In the end I went back to good ole' shell scripting with xxd to handle most of the heavy lifting. Finished in under an hour.

The appeal of these newfangled tools is that you can work with data sizes that are infeasible to people who only know Excel, yet you don't need to understand a single thing about how your data is actually stored.

If you can be bothered to read the file format specification, open up some files in a hex editor to understand the layout, and write low-level code to parse the data - then you can achieve several orders of magnitude higher performance.

semi-extrinsic

12 days ago

I think command line tools is going to be fine if all you do is process one row at a time. Or if your data has a known order.

But if you want to do some kind of grouping or for example pivoting rows to columns, I think you will still benefit from a distributed tool like Spark or Trino. That can do the map/reduce job for you in a distributed way.

fifilura

11 days ago

Because most people don’t have an HPC background, aren’t familiar with parquet internals, don’t know how to make their language stream data instead of buffering it all in memory, have slow internet connections at home, are running out of disk space on their laptops, and only have 4 GB of ram to work with after Chrome and Slack take up the other 12 GB.

15 GB is a real drag to do anything with. So it’s a real pain when someone says “I’ll just give you 1 TB worth of parquet in S3”, the equivalent of dropping a billion dollars on someone’s doorstep in $1 bills.

memset

11 days ago

Funny analogy! I loved it. I'm ready to start with ScratchData which btw and respectfully never heard of.

Thanks again for sharing your tool and insightful knowledge.

vladsanchez

11 days ago

How do you see the competition from Trino and Athena in your case?

Depends a lot on what you want to do with the data of course, but if you want to filter and slice/dice it, my experience is that it is really fast and stable. And if you already have it on s3, the threshold for using it is extremely small.

fifilura

11 days ago

what is your point? They talked about 15GB of parquet - what does this have to do with 1TB of parquet?

Also: How does the tool you sell here solve the problem - the data is already there and can't be processed (15GB - funny that seems to be the scale of YC startups?)? How does a tool to transfer the data into a new database help here?

fock

11 days ago

> How does a tool to transfer the data into a new database help here?

Maybe because the problem literally is "how to transfer this data into a database"

wodenokoto

10 days ago

Parquet is column oriented and so row-based manipulation can be inefficient

jjtheblunt

11 days ago

> Hierarchically, a file consists of one or more row groups.

https://parquet.apache.org/docs/concepts/

Maybe the file in question only has one row group. Which would be weird, because the creator had to go out of their way to make it happen.

fock

11 days ago

Yep. I use it all the time. But, as you said, depends on specific layouts, so can’t expect it to be row-convenient .

jjtheblunt

11 days ago

Wouldn't partial reads fix the RAM problem? e.g. something like this: https://stackoverflow.com/a/69888274

It might not be fast, but a quick 1-off solution that you let run for a while would probably do that job. There shouldn't be a need to load the whole file into memory.

martinky24

12 days ago

Have you given DuckDB a try? I'm using it to shuttle some hefty data between postgres and some parquet files on S3 and it's only a couple lines. Haven't noted any memory issues so far

wild_egg

12 days ago

Agreed on DuckDB, fantastic for working with most major data formats

csjh

12 days ago

Took your advice and tried DuckDB. Here's what I've got so far:

```

def _get_duck_db_arrow_results(s3_key):

    con = duckdb.connect(config={'threads': 1, 'memory_limit': '1GB'})
    con.install_extension("aws")
    con.install_extension("httpfs")
    con.load_extension("aws")
    con.load_extension("httpfs")

    con.sql("CALL load_aws_credentials('hadrius-dev', set_region=true);")
    con.sql("CREATE SECRET (TYPE S3,PROVIDER CREDENTIAL_CHAIN);")
    results = con \
        .execute(f"SELECT * FROM read_parquet('{s3_key}');") \
        .fetch_record_batch(1024)
    for index, result in enumerate(results):
        print(index)
    return results
```

I ran the above on a 1.4gb parquet file and 15 min later, all of the results were printed at once. This suggests to me that the whole file was loaded loaded into memory at once.

calderwoodra

12 days ago

You asked it to fetch a batch (15min) then iterated over the batch (all at once).

To stream, fetch more batches.

What ddb does to get the batches depends on hand wavey magic around available ram, and also the structure of the parquet.

akdor1154

11 days ago

It's been a long time since I've used python but that sounds like buffering in the library maybe? I use it from Go and it seems to behave differently.

When I'm writing to postgres though I'm doing into entirely inside DuckDB with a `INSERT INTO ... SELECT ...` and that seems to stream it over.

wild_egg

11 days ago

Using polars in Python I've gotten similar to work, using LazyFrame and collect in streaming mode:

``` df = pl.scan_parquet('tmp/'+DUMP_NAME+'_cleaned.parquet')

with open('tmp/'+DUMP_NAME+'_cleaned.jsonl', mode='w', newline='\n', encoding='utf8') as f: for row in df.collect(streaming=True).iter_rows(named=True): row = {k: v for k, v in row.items() if (v is not None and v != [] and v != '')} f.write(json.dumps(row, default=str) + '\n') ```

arrowleaf

12 days ago

This collects all into memory, then iterates.

akdor1154

11 days ago

My suggestion is to load each row group individually, as they generally will be much smaller than your total file size. You can do this via pyarrow.ParquetFile.read_row_group. To truly optimize this for reading from s3 you could use fsspec’s open_parquet_file library which would allow you to only load each row group one at a time.

cycrutchfield

12 days ago

I had similiar issue, but for aggreagations. Use case was to "compress" large datasets into smaller aggregations for insertion into a costly db. At first we used duckdb but memory became an issue there and we also bumped into a couple of issues with how duckdb handles arrays. We then moved this workload to clickhouse local, which was faster and had more fine tuning options to our liking. in this case was limiting ram usage with i.e. max_bytes_before_external_group_by

sagia

11 days ago

I'm puzzled as to why this is a problem that has lasted months. My phone has enough RAM to work with this file in memory. Do not use pyspark, it is unbelievably slow and memory hogging if you hold it even slightly wrong. Spark is for tb-sized data, at minimum.

Have you tried downloading the file from s3 to /tmp, opening it with pandas, iterating through 1000 row chunks, pushing to DB? The default DF to SQL built into pandas doesn't batch the inserts so it will be about 10x slower than necessary, but speeding that up is a quick google->SO away.

392

11 days ago

Try pyarrow.ParquetFile.iter_batches()

Streams batches of rows

https://arrow.apache.org/docs/python/generated/pyarrow.parqu...

Edit — May need to do some extra work with s3fs too from what I recall with the default pandas s3 reading

Edit 2 — or check out pyarrow.fs.S3FileSystem :facepalm:

dijksterhuis

12 days ago

I've spent many many hours trying these suggestions, didn't have much luck. iter_batches loads the whole file (or some very large amount of it) into memory.

calderwoodra

12 days ago

It sounds like maybe your parquet file has no partitioning. Apart from the iterating over row groups like someone else suggested, I suspect there is no better solution than downloading the whole thing to your computer, partitioning it in a sane way, and uploading it again. It's only 15 GB so it should be fine even on an old laptop.

Of course then you might as well do all the processing you're interested in while the file is on your local disk, since it is probably much faster than the cloud service disk.

semi-extrinsic

12 days ago

What do you mean by the parquet file might have no partitioning? Is the row group size not the implicit partitioning?

okr

9 days ago

Spend a few bucks on an EC2 instance with a few terabytes of RAM for an hour or so. u-3tb1.56xlarge is about $27/hr.

jfim

12 days ago

This is the answer for a one-off or occasional problem unless your time is worthless.

$200 to rent a machine that can run the naive solution for an entire day is peanuts compared to the dev time for a “better” solution. Running that machine for eight hours would only cost enough to purchase about a half day of junior engineer time.

nucleardog

11 days ago

Understand the format of your data.

Look at the parquet file metadata: use whatever tool you want for that. The Python parquet library is useful and supports s3.

How big are your row groups? If it’s one large row group then you will run into this issue.

What’s the number of rows in each row group?

orf

11 days ago

Perhaps look into using dlt from https://dlthub.com, using pyarrow or polars. It handles large datasets well, especially when using generators to process the data in chunks.

bluedemon

12 days ago

You may want to give octosql a try. I was able to read 100GiB parquet files and aggregate them using less than 100MiB RAM and it is pretty fast too

xk3

11 days ago

Maybe check out alteryx designer cloud, airbyte, or estuary?

katamaster818

10 days ago

Should be able to do it with Aws Athena?

Hikikomori

11 days ago

It's very interesting to see how a new "enterprise open source" project is born. The part where right at the start the author knows that they should have more than one company on board, and how the other companies each contribute their part.

Zababa

10 days ago

Following along the subsequent blog posts in this series progress to Arrow and then to... OpenLineage

I'm curious if anyone has experience with OpenLineage/Marquez or similar they'd like to share

anentropic

11 days ago

Why is it not in Debian? Is there some deep and dark secret?

https://search.debian.org/cgi-bin/omega?DB=en&P=parquet

jjgreen

13 days ago

No-one with the combination of motivation, time and skills needed get it into Debian. Someone wanted to get a Python implementation in, but it looks like they never found the time.

https://bugs.debian.org/838338

These days Debian packaging has become a bit irrelevant, since you can just shove upstream releases into a container and go for it.

pabs3

11 days ago

If you want to depend on curl|bash setups, sure. If you want to get security updates, good packaging is still required.

mkesper

11 days ago

Parquet is a file format. Should a file format be in Debian?

whinvik

13 days ago

I mean library support for reading and writing it:

  > apt-cache search hdf5 | wc -l
  134
  > apt-cache search netcdf | wc -l
  70
  > apt-cache search parquet | wc -l 
  0

jjgreen

13 days ago

Are there Arrow libraries. Feel like a lot of applications that read Parquet actually outsource raw reads to Arrow.

whinvik

13 days ago

look for polars, https://pola.rs

jjtheblunt

12 days ago

Why did you ignore the link i gave you above?

If you follow that link, you'll see polars and parquet are a large highly configurable collection of tools for format manipulations across many HPC formats. Debian maintainers possibly don't want to bundle the entirety, as it would be vast.

Might this help you, though?

https://cloudsmith.io/~opencpn/repos/polar-prod/packages/det...

jjtheblunt

12 days ago

petre

12 days ago

My question is "why isn't it in Debian?", I ask that since Debian has rather high standards and the absence from Debian suggests some quality issue in available libraries for the format or the format itself.

Are there dark secrets?

jjgreen

12 days ago

Could the dark secret you seek be "debian isn't for bleeding edge packages"?

it's very modern and perhaps hasn't been around long enough to have debian maintainers feel it's vetted.

for instance, documentation for Python bindings is more advanced than for Rust bindings, but the package itself uses Rust at the low level.

jjtheblunt

12 days ago

Parquet is what, 12 years old? Hardly cutting edge. What you say my well be true for polars (I'm not familiar with it), if/when it (or something else) does get packaged I'll give parquet another look ...

jjgreen

12 days ago

Pandas is probably in Debian and it can read parquet files. Polars is fairly new and under active development. It's a python library, I install those in $HOME/.local, as opposed to system wide. One can also install it in a venv. With pip you can also uninstall packages and keep things fairly tidy.

petre

11 days ago

Pandas is in Debian but it cannot read parquet files itself, it uses 3rd party "engines" for that purpose and those are not available in Debian

  Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
    on linux
  Type "help", "copyright", "credits" or "license" for more 
    information.
  >>> import pandas
  >>> pandas.read_parquet('sample3.parquet')
  Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/usr/lib/python3/dist-packages/pandas/io/parquet.py", 
        line 493, in read_parquet
      impl = get_engine(engine)
    File "/usr/lib/python3/dist-packages/pandas/io/parquet.py", 
      line 53, in get_engine
    raise ImportError(
      ImportError: Unable to find a usable engine; tried using: 
      'pyarrow', 'fastparquet'.
    A suitable version of pyarrow or fastparquet is required for 
    parquet support.

jjgreen

11 days ago

Yes, i wasn't clear: it's the polars library that's actively changing, so that might be the issue, or just the vast set of optional components configurable on installation, which isn't the normal package manager experience.

FWIW i think i share your general aversion to _not_ using packages, just for the tidiness of installs and removals, though i'm on fedora and macos.

jjtheblunt

12 days ago

Duckdb is probably what you want, though I don't think it's in debian either. It's in Arch though.

marginalia_nu

12 days ago

pandas is a python-centric, tabular data handler that works well in clouds (and desktop Debian). Pandas can read parquet data today, among other libs mentioned. The binary dot-so driver style is single-host centric and not the emphasis of these cloudy projects (and their cloudy funders)

https://pandas.pydata.org/docs/reference/api/pandas.read_par...

https://packages.debian.org/buster/python3-pandas

Perhaps more alarm is called for when this python+pandas and parquet does not work on Debian, but that is not the case today.

ps- data access in clouds often uses the S3:// endpoint . Contrast to a POSIX endpoint using _fread()_ or similar.. many parquet-aware clients prefer the cloudy, un-POSIX method to access data and that is another reason it is not a simple package in Debian today.

mistrial9

12 days ago

Pandas often has significant memory overhead, so it's not uncommon to need ~3-5x the amount of memory as your file size.

Polars and DuckDB are much better about memory management.

datadrivenangel

12 days ago

Also, Polars has a sane and well thought out API, unlike the hot streaming mess that is Pandas.

marton78

12 days ago

As I understand it, pandas can read parquet if the pyarrow or fastparquet packages are available, but that's not the case and attempts to fix that have been underway for several years.

https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=970021

jjgreen

12 days ago

> most basic definition of open source is the code is available for you to read

What crap. That's 'source-available', NOT open-source.

But at least co-option of terminology is an indicator of success.

heroprotagonist

10 days ago