1 week ago
Sun Apr 13, 2025 10:45pm PST
Ask HN: Why don't we have a functional DSL for data+embedding+API pipelines?
I’ve been working on a pretty common problem:

   - I have structured data in JSONL files (in.jsonl, out.jsonl)
   - I match lines by a key
   - I transform them into (text, embedding) pairs
   - I optionally filter/map them
   - I batch them (into chunks of 50)
   - I push each batch into an external system (e.g. vector DB, Chroma)
That’s it. Sounds trivial. But it turns into ugly imperative Python code very quickly: nested for-loops, global indices, +=, manual batching, line-by-line handling, low-level JSON parsing.

Here’s what it usually looks like in Python:

```

   with open("in.json", "r") as fin:
      with open("out.json", "r") as fout:

        for in_line, out_line in zip(fin, fout):
            in_data = json.loads(in_line)
            out_data = json.loads(out_line)

            if in_data["custom_id"] != out_data["custom_id"]:
                raise Exception...

            texts = in_data["body"]["input"]
            embeddings = [d["embedding"] for d in out_data["response"]["body"]["data"]]

            for i in range(len(texts)):
                doc = texts[i]
                emb = embeddings[i]

                metadata = {
                    "source": f"chunk-{global_ids}",
```

We’re in 2025, and this is how we’re wiring data into APIs.

---

Why do we tolerate this?

This is a declarative, streaming, data processing problem. Why aren’t we using something more elegant? Something more composable, like functional pipelines?

I'm asking myself: Why don’t we have a composable, streaming, functional DSL for this kind of task?

---

Why not build it like Unix pipes?

What I want is something that feels like:

   cat input.jsonl \
   | match output.jsonl on custom_id \
   | extract (text, embedding) \
   | filter not-empty \
   | batch 50 \
   | send-to-chroma
---

In Lisp / Clojure:

   (->> (zip input output)
        (filter (= :custom_id))
        (mapcat (fn [[in out]] (zip (:input in) (:embedding out))))
        (partition-all 50)
        (map send-to-chroma))

---

In Elixir + Broadway:

   Broadway
   |> read_stream("in.jsonl", "out.jsonl")
   |> match_on(:custom_id)
   |> map(&{&1.text, &1.embedding})
   |> batch_every(50)
   |> send_to_chroma()
---

And now, back to Python..

We’re stuck writing imperative soup or building hacky DSLs with things like:

   load_json_pairs() \
   | where(is_valid) \
   | select(to_embedding_record) \
   | batch(50) \
   | foreach(send_to_chroma)

...or, more realistically, writing thousands of lines of with open(...) as f.

And even though libraries like tf.data.Dataset, dask.bag, pandas, or pipe exist, none of them really solve this use case in a cohesive and expressive way. They all focus on either tabular data, or big data, or ML input pipelines – not this "structured data -> transform -> push to API" pattern.

---

This is especially absurd now that everyone’s doing RAG

With Retrieval-Augmented Generation (RAG) becoming the norm, we’re all parsing files, extracting embeddings, enriching metadata, batching, and inserting into vector stores.

Why are we all writing the same low-level, ad-hoc code to do this?

Shouldn’t this entire category of work be addressed by proper DSL/framework?

Wouldn’t it make sense to build... - a functional DSL for JSON-to-embedding-to-API pipelines? - or a Python library with proper map, filter, batch, pipe, sink semantics? - or even a streaming runtime like Elixir Broadway or a minimal functional Rx-style graph?

Even R with dplyr has more elegant ways to express transformation than what we do in Python for these jobs.

---

Am I missing something?

Is there a tool, a language, or a framework out there that actually solves this well?

Or is this just one of those gaps in the tooling ecosystem that no one has filled yet?

Would love to hear what others are doing – and if anyone’s already working on a solution like this.

Thanks.

comments:
add comment
loading comments...