Chapter 6 ยท CORE

Workflow Pipeline Engine (The Assembly Line)

๐Ÿ“„ 06_workflow_pipeline_engine__the_assembly_line_.md ๐Ÿท Core

Chapter 6: Workflow Pipeline Engine (The Assembly Line)

Welcome back! In the previous chapter, Storage Layer (The Filing Cabinet), we learned how memU stores data safely.

Now, we need to talk about logic.

In many applications, the logic for a task like "Memorize this file" is written as one giant, 500-line function. It reads the file, parses it, calls an AI, saves to a database, and logs the result.

But what if you want to change just one part? What if you want to add a step to translate the text into Spanish before the AI reads it? In a "giant function" design, you would have to rewrite the core code. That is messy and dangerous.

memU uses a Workflow Pipeline Engine. It breaks complex tasks into small, interchangeable Lego blocks called Steps.

The Motivation: The Custom Car Factory

Imagine a car factory assembly line:

  1. Station A: Attach Frame.
  2. Station B: Attach Engine.
  3. Station C: Paint Car.

Now, imagine a customer wants a Sunroof.

The Pipeline Engine allows you to insert, remove, or replace steps in the Memorize Pipeline or Retrieve Pipeline dynamically.

Key Concepts

1. The Step (The Worker)

A WorkflowStep is a small, isolated unit of work. It does one thing well.

2. The State (The Clipboard)

How does the "Extraction Worker" pass data to the "Database Worker"? They share a dictionary called the Workflow State. Think of it as a clipboard attached to the car moving down the line.

3. The Pipeline Manager (The Foreman)

This is the system that organizes the workers. It ensures that if Worker B needs a wrench, Worker A has already put a wrench on the clipboard.

Visualizing the Assembly Line

Here is how memU visualizes a workflow modification:

graph LR subgraph Original Pipeline A[Step 1: Ingest] --> B[Step 2: Extract] --> C[Step 3: Save] end subgraph Modified Pipeline A2[Step 1: Ingest] --> New[**NEW: Translate**] New --> B2[Step 2: Extract] --> C2[Step 3: Save] end style New fill:#ffcc80,stroke:#f57c00,stroke-width:2px

How to Use It: Modifying Behavior

You can change how memU thinks without touching the source code. You do this through the MemoryService.

Scenario: Adding a "logging" step

Let's say you want to print "I am about to save!" right before the database step.

from memu.workflow.step import WorkflowStep

# 1. Define your custom worker
async def my_logger(context, state):
    print(f"I am processing: {state['resource'].id}")
    return state # Pass the clipboard to the next person

# 2. Wrap it in a Step
log_step = WorkflowStep(
    step_id="custom_logger",
    run=my_logger,
    requires={"resource"} # Safety check: needs 'resource' to exist
)

Now, inject it into the live service:

# 3. Insert it into the "memorize" assembly line
service.insert_step_before(
    pipeline="memorize",
    target_step_id="categorize_items", # The existing step name
    new_step=log_step
)

# Now, whenever you call service.memorize(), your print statement will run!

Other Operations


Internal Implementation: The Safety Checks

The PipelineManager is very strict. It prevents crashes before they happen.

When you try to register or modify a pipeline, the Manager simulates the flow to ensure the "Clipboard" (State) always has the required data.

sequenceDiagram participant Dev as Developer participant PM as Pipeline Manager participant Pipe as Pipeline List Dev->>PM: insert_step_after(Step A, Step B) PM->>PM: Check: Does Step A exist? PM->>PM: Check: Does Step B need data Step A didn't provide? alt Validation Passed PM->>Pipe: Update List PM-->>Dev: Success (Revision 2) else Validation Failed PM-->>Dev: Error! Missing Keys end

Under the Hood: pipeline.py

Let's look at src/memu/workflow/pipeline.py to see how it manages these lists.

1. Storing Revisions

The manager doesn't just overwrite the list; it creates a new Revision. This is safer for debugging.

@dataclass
class PipelineRevision:
    name: str
    revision: int           # e.g., Version 1, Version 2...
    steps: list[WorkflowStep]
    metadata: dict[str, Any]

2. The Mutator Pattern

When you modify the pipeline, memU uses a "Mutator." This is a function that takes the old list and safely transforms it into the new list.

# Inside PipelineManager
def insert_after(self, name, target_id, new_step):
    
    # Define HOW to change the list
    def mutator(steps):
        for index, step in enumerate(steps):
            if step.step_id == target_id:
                # Insert the new step right after the target
                steps.insert(index + 1, new_step)
                return
        raise KeyError(f"Step {target_id} not found")

    # Apply the change and create a new revision
    return self._mutate(name, mutator)

3. Validation (The "Requires/Produces" Check)

This is the most critical part. Every step declares what it requires (input keys) and what it produces (output keys).

def _validate_steps(self, steps, initial_keys):
    available_keys = set(initial_keys) # Start with what user provides
    
    for step in steps:
        # 1. Check if the clipboard has what this step needs
        missing = step.requires - available_keys
        if missing:
            raise ValueError(f"Step '{step.step_id}' is missing: {missing}")
            
        # 2. Add what this step creates to the pile
        available_keys.update(step.produces)

This ensures you can't insert a step that tries to read a variable that hasn't been created yet!


Summary

In this chapter, we learned that memU is not a static block of code. It is a flexible factory.

  1. Steps: Logic is broken into small chunks.
  2. Pipeline: Chunks are arranged in a sequence.
  3. Manager: You can modify the sequence (Insert/Replace/Remove) at runtime.
  4. Safety: The manager ensures the data flow is valid so the app doesn't crash.

This architecture is what makes memU a framework, not just a library. You can bend it to fit your specific needs.

There is one final component we haven't discussed deeply yet. The pipelines rely heavily on calling AI models (LLMs). But how do we handle different providers like OpenAI, Anthropic, or local models smoothly?

We need a translator.

Next Chapter: LLM Client Wrapper (The Translator)


Generated by Code IQ