I built Laminar Flow to solve a common frustration with traditional workflow engines - the rigid need to predefine all node connections. Instead of static DAGs, Flow uses a dynamic task queue system that lets workflows evolve at runtime.
Flow is built on 3 core principles:
* Concurrent Execution - Tasks run in parallel automatically
* Dynamic Scheduling - Tasks can schedule new tasks at runtime
* Smart Dependencies - Tasks can await results from previous operations
All tasks share a thread-safe context for state management.
This architecture makes it surprisingly simple to implement complex patterns like map-reduce, streaming results, cycles, and self-modifying workflows. Perfect for AI agents that need to make runtime decisions about their next actions.
Flow is lightweight, elegantly written and has zero dependencies for the engine.
Behind the scenes it's a ThreadPoolExecutor, which is more than enough to handle concurrent execution considering majority of AI workflows are IO bound.
To make it possible to wait for the completion of previous tasks, I just added semaphore for the state value. Once the state is set, one permit is released for the semaphore.
The project also comes with built-in OpenTelemetry instrumentation for debugging and state reconstruction.
Give it a try here -> https://github.com/lmnr-ai/flow. Just do pip install lmnr-flow. (or uv add lmnr-flow). More examples are in the readme.
Looking forward to feedback from the HN community! Especially interested in hearing about your use cases for dynamic workflows.
Couple of things on the roadmap, so contributions are welcome!
* Async function support
* TS port
* Some consensus on how to handle task ids when the same tasks is spawned multiple times
Would be interesting to see a complex agent implementation in both Flow and regular LangGraph to compare maintainability.
As you said, the easiest conditional workflows are tedious and we have to rely on "conditional" edges to solve the simplest stuff. Then, how would you define cycles or spawning of the same task multiple times as a node-edge relation? It becomes extremely inconvenient and just gets in your way. The reason I built Flow is exactly because I previously relied on a predefined node-edge system which I also built (https://docs.lmnr.ai/pipeline/introduction).
Also it impossible to do MapReduce like operations on node-edge without inventing some special node for handling that stuff.
Idea about comparing LangGraph and Flow is really good and I will work on that asap! What kind of workflows would you love to see implemented in Flow? Btw, there're a lot of examples in the readme and in the /tests folder
What might be new to those developing tooling with AI, might not be new to other areas of software.
I'm intrigued by the ability to start execution from a particular task.
One thing I like about LangGraph is the declarative state merging. In the MapReduce example, how do you guarantee that the collector.append() operation is thread-safe?
Motivated by more or less the same frustration you've laid out here.
I have a meta question though - there seems to be a huge amount of activity in this space of LLM agent related developer tooling. Are people actually successfully and reliably delivering services which use LLMs? (Meaning services which are not just themselves exposing LLMs or chat style interfaces).
It seems like 18 months ago people were running around terrified, but now almost the opposite.
but also many chat bots and assistants too
I think something similar applies here: you see way more dev tools than successful products because people can't build many successful products using LLMs. Building a devtool (the old-fashioned way) is something that people can tangibly do.
The ReAct paradigm for agents makes me think of standard workflows because they're not that different conceptually: Read the state of the world, plan / "reason" based on user intent, perform actions that mutate, then go back and read the state of the world if problem hasn't been fully solved (ie start another workflow). Similar in concept from reading from dbs, computing logic, mutating / writing to new table, trigger follow on job from data engineering.
Workflows currently only started after human events, but going to move towards additional workflows started through an activity soon - I'm keeping the state of the world independent of the execution engine so each workflow will read the new state.
Moreover, there isn't an example how it `could` work for inference or function_calls/tools/agents
But looks simple to get started and feels like it has the right foundation for good things to come. Kudos!
I focused on the Agent because it was initially built for that internally. But you are right, for the release I made it extremely general and to be used as a foundation for the complex systems.
I would say that unless agent based examples are highly compelling, it makes more sense to simply remove the agent stuff completely from the pitch, lest you be inevitably accused of taking advantage of the current AI hype for an otherwise unrelated piece of technology (that happens to be a good fit for agent based workflows - something which I haven't observed to even work very well with the best models).
For some reason some LLM specific examples just slipped of my mind because I really wanted to show the barebone nature of this engine and how powerful it is despite its simplicity.
But you also right, it's general enough that you can build any task based system or rebuild complex system with task architecture with Flow.
Signal is clear, add more agent specific examples.
It supports tasks, dynamic routes and parallel execution in pure Python build-ins(zero deps). But just a side project so no persistent stuff and just a easy tool.
The challenge with this approach though is that you need to run the actual code to see what it does, or as a developer build up a mental model of the code ... but it does shine in certain use cases -- and also reminds me of https://github.com/insitro/redun because it takes this approach too.
While my project is not meant to be specifically used with LLMs, what i build (or am building) is a system which has no specific defined sequence, but is rather a composition of "actions" of which each of them has a specific defined requirement in form of a data structure that is necessary to execute the "action". I build it in a way to be self supervising without one single task scheduling mechanism to enable people to write data driven applications.
Its nice to see that im not the only one (ye i didnt expect that to be the case dont worry im not elon-musk crazy) that tries to go such a way.
While i build it for completly different use cases (when i started LLM weren't such a big thing as they are now) its definatly a cool and creative way to use such an architecture.
Gl hf :) and thumbs up
1. Deadlocks
2. Programmer Experience
3. Updating the code with in-flight tasks
To avoid deadlocks, it seems like the executor should need to know what the dependencies are before attempting execute tasks. If we have 4 threads of execution, it seems like we could get into a state where all 4 of our threads are blocking on semaphores waiting for another task to provide a dependent value. And at scale, if it can happen, it definitely will happen eventually.Potentially related-- it could make sense for the engine to give preference to completing partially completed tasks before starting new fresh tasks.
Also, I wonder if there's a way to lay the specification of tasks out so it looks more like normal code-- potentially with async await. What I mean by normal code is this: It's much more natural to write a program with 2 steps like this:
name = await ask_for_name()
await greet(name)
Than to redo that in a task way like this def ask_for_name():
...
return TaskResult(next_task=greet(name))
def greet():
...
If I have 7 steps with some loops and conditionals, it becomes much more difficult to grok the structure with the disjointed task structure, and much more difficult to restructure and reorganize it. I wonder if there's a way to pull it off where the distributed task structure is still there but the code feels more natural. Using this framework we're essentially writing the code in a fragmented way that's challenging to reorganize and reason about.What will happen when we change the task code and deploy a new version; I wonder what happens to the tasks that were in flight at the moment we deploy?
1. tasks are not explicitly called from another task In your example greet() is never called, instead task with id=greet will be pushed to the queue
2. The reason I opted for distributed task approach is precisely to eliminate await task_1 await task_2 ...
Going to the point 1, task_2 just says to the engine, ok buddy, now it is time to spawn task_2. With that semantics we isolate tasks and don't deal with the outer tasks which calls another tasks. Also, parallel task execution is extremely simply with that approach.
3. Deadlocks will happen iff you will wait for the data that is never assigned, which is expected. Otherwise, with the design of state and engine itself, they will never happen.
https://github.com/lmnr-ai/flow/blob/main/src/lmnr_flow/stat...
https://github.com/lmnr-ai/flow/blob/main/src/lmnr_flow/flow...
4. For your last point, I would argue the opposite is true, it's actually much harder to maintain and add new changes when you hardcode everything, hence why this project exists in the first place.
5. Regarding deployment. Flow is not a temporal-like (yet), everything is in-memory and but I will def look into how to make it more robust