Welcome back! In the previous chapter, Storage Layer (The Filing Cabinet), we learned how memU stores data safely.
Now, we need to talk about logic.
In many applications, the logic for a task like "Memorize this file" is written as one giant, 500-line function. It reads the file, parses it, calls an AI, saves to a database, and logs the result.
But what if you want to change just one part? What if you want to add a step to translate the text into Spanish before the AI reads it? In a "giant function" design, you would have to rewrite the core code. That is messy and dangerous.
memU uses a Workflow Pipeline Engine. It breaks complex tasks into small, interchangeable Lego blocks called Steps.
Imagine a car factory assembly line:
Now, imagine a customer wants a Sunroof.
The Pipeline Engine allows you to insert, remove, or replace steps in the Memorize Pipeline or Retrieve Pipeline dynamically.
A WorkflowStep is a small, isolated unit of work. It does one thing well.
extract_items (finds facts).persist_index (saves to database).How does the "Extraction Worker" pass data to the "Database Worker"? They share a dictionary called the Workflow State. Think of it as a clipboard attached to the car moving down the line.
{"text": "Hello"}text and writes: {"embedding": [0.1, 0.9]}This is the system that organizes the workers. It ensures that if Worker B needs a wrench, Worker A has already put a wrench on the clipboard.
Here is how memU visualizes a workflow modification:
You can change how memU thinks without touching the source code. You do this through the MemoryService.
Let's say you want to print "I am about to save!" right before the database step.
from memu.workflow.step import WorkflowStep
# 1. Define your custom worker
async def my_logger(context, state):
print(f"I am processing: {state['resource'].id}")
return state # Pass the clipboard to the next person
# 2. Wrap it in a Step
log_step = WorkflowStep(
step_id="custom_logger",
run=my_logger,
requires={"resource"} # Safety check: needs 'resource' to exist
)
Now, inject it into the live service:
# 3. Insert it into the "memorize" assembly line
service.insert_step_before(
pipeline="memorize",
target_step_id="categorize_items", # The existing step name
new_step=log_step
)
# Now, whenever you call service.memorize(), your print statement will run!
replace_step: Swap a default component with your own custom version.remove_step: Delete a step you don't need (e.g., "I don't want to save raw resources").The PipelineManager is very strict. It prevents crashes before they happen.
When you try to register or modify a pipeline, the Manager simulates the flow to ensure the "Clipboard" (State) always has the required data.
pipeline.py
Let's look at src/memu/workflow/pipeline.py to see how it manages these lists.
The manager doesn't just overwrite the list; it creates a new Revision. This is safer for debugging.
@dataclass
class PipelineRevision:
name: str
revision: int # e.g., Version 1, Version 2...
steps: list[WorkflowStep]
metadata: dict[str, Any]
When you modify the pipeline, memU uses a "Mutator." This is a function that takes the old list and safely transforms it into the new list.
# Inside PipelineManager
def insert_after(self, name, target_id, new_step):
# Define HOW to change the list
def mutator(steps):
for index, step in enumerate(steps):
if step.step_id == target_id:
# Insert the new step right after the target
steps.insert(index + 1, new_step)
return
raise KeyError(f"Step {target_id} not found")
# Apply the change and create a new revision
return self._mutate(name, mutator)
This is the most critical part. Every step declares what it requires (input keys) and what it produces (output keys).
def _validate_steps(self, steps, initial_keys):
available_keys = set(initial_keys) # Start with what user provides
for step in steps:
# 1. Check if the clipboard has what this step needs
missing = step.requires - available_keys
if missing:
raise ValueError(f"Step '{step.step_id}' is missing: {missing}")
# 2. Add what this step creates to the pile
available_keys.update(step.produces)
This ensures you can't insert a step that tries to read a variable that hasn't been created yet!
In this chapter, we learned that memU is not a static block of code. It is a flexible factory.
This architecture is what makes memU a framework, not just a library. You can bend it to fit your specific needs.
There is one final component we haven't discussed deeply yet. The pipelines rely heavily on calling AI models (LLMs). But how do we handle different providers like OpenAI, Anthropic, or local models smoothly?
We need a translator.
Next Chapter: LLM Client Wrapper (The Translator)
Generated by Code IQ