MastraのAgent Loopを理解する

#mastra #ai-agent #typescript

TL;DR

  • AIエージェント = LLM呼び出しとツール実行を繰り返し、タスクを自律的に達成するシステム
  • Mastraでは do-while ワークフロー で Agent Loop を実装している
  • maxSteps / stopWhen でループ制御、requireToolApproval で承認フローを挟める
  • Human-in-the-Loop は suspend/resume 機能で実現されており、セッションを跨いでも復帰可能

対象読者

  • AIエージェントの「中身」を理解したい方
  • TypeScript/JavaScript でエージェント開発に興味があるエンジニア

この記事では、TypeScript製AIエージェントフレームワーク「Mastra」の内部実装を読み解き、AIエージェントの仕組みを理解していきます。

Mastraとは

Mastraは、AIエージェントを構築するためのTypeScript製のフレームワークです。 AIと言えばPythonというイメージを持たれている方も少なくないのかなと思いますが、MastraのCTOは以下のようなポストをしていました。

@tweet

Pythonは機械学習モデルの訓練に使い、それを届けるのはTypeScriptでやるってことですね。 ほとんどのWebフロントエンドはTypeScriptで実装されることが多いので、Next.jsなどのフルスタックなフロントエンドフレームワークに統合しやすいという点からも、TypeScriptの方がやりやすいのは納得できます。

Mastraの主な特徴は以下の通りです。

  • Agent - LLMとツールを組み合わせてタスクを自律的に実行
  • Workflow - グラフベースのワークフローエンジンで複雑な処理を定義(.then(), .branch(), .parallel()
  • Human-in-the-Loop - エージェントやワークフローを一時停止し、ユーザーの承認を待ってから再開
  • Memory - 会話履歴、セマンティック検索、ワーキングメモリに対応
  • 40以上のプロバイダに対応 - OpenAI、Anthropic、Geminiなどを統一インターフェースで利用可能

同じTypeScript製のAIエージェントツールキットとして、VercelのAI SDKがありますが、こちらはWorkflowやMemoryといった高レベルなAPIは提供されておらず、主にプロバイダーごとのモデル抽象化やAgent Loopの実装などがされており、少しレイヤーが異なります。(ちなみに、MastraはAI SDKに依存しています。)

今回はMastraにおけるAgentクラスの内部実装、特にAgent LoopHuman-in-the-Loopに注目します。

そもそもAIエージェントって何なん?

Anthropicは以下のように述べています。

Agents are systems where LLMs dynamically direct their own processes and tool usage, maintaining control over how they accomplish tasks. エージェントとは、タスクの達成方法について制御を維持しながら、LLMが自らのプロセスとツールの使用を動的に指揮するシステムのことです。

引用元: Building effective agents - Anthropic

事前に定義されたワークフローとは異なり、自律的に行動(ツール実行)しながら、タスクをこなすようなシステムのことを指すようです。

では、このような自律的なAIエージェントはどのように実装することができるんでしょうか。 自律的に動くシステムを実装すると聞くと、難しそうに感じてしまいますが、実はAIエージェントの実装は意外とシンプルで、以下の図のようにLLMの呼び出しとツール実行を反復するような構成になります。(この反復のことをAgent Loopと呼びます)

ここで言う「ツール実行」について、LLM自体はツール実行を”依頼”してくるだけであり、実際に実行するのはアプリケーション側の役目であるということには注意してください(提供するツールの場合はプロバイダ側で実行されます)

Agent Loopの概要 出典: Building effective agents - Anthropic

ここからは具体的にMastraがこれをどのように実装しているのか見ていきます。

MastraのAgent Loopの実装

MastraのAgent Loopは、2つのワークフローで構成されています。

エージェントとワークフローは違うと言ったばかりなので、ちょっとややこしいですね。 自律性はLLMの判断に委ね、その行動パターンの制御構造がワークフローで記述されているということです。

MastraではWorkflowが設計の中心(というか土台)にあるようですね。

Agent Loopのワークフロー定義

Agent Loopのワークフローの定義はこんな感じになっています。(一部、簡略化してます)

// 1回の LLM呼び出し+ツール実行 を行うワークフロー
const agenticExecutionWorkflow = createWorkflow({ id: 'executionWorkflow' })
  .then(llmExecutionStep) // LLM呼び出し
  .map(addResponseToMessageList) // レスポンスを追加
  .map(mapToolCalls) // ツール呼び出しを抽出
  .foreach(toolCallStep, { concurrency: 10 }) // ツール並列実行
  .then(llmMappingStep) // 結果をまとめる
  .commit();

// Agent Loop のワークフロー
const agenticLoopWorkflow = createWorkflow({ id: 'agentic-loop' })
  .dowhile(agenticExecutionWorkflow, async ({ inputData }): boolean => {
    /* 継続条件の評価(後述) */
  })
  .commit();

要らないかもしれないですが、図も一応載せておきます。

flowchart TD
    subgraph AgenticLoopWorkflow["AgenticLoopWorkflow"]
        direction TB

        subgraph AgenticExecutionWorkflow["AgenticExecutionWorkflow"]
            direction TB
            LLM["llmExecutionStep<br/>LLM呼び出し"]
            AddMsg["addResponseToMessageList<br/>レスポンスを追加"]
            MapTools["mapToolCalls<br/>ツール呼び出しを抽出"]
            ExecTools["toolCallStep (foreach)<br/>ツール並列実行"]
            Mapping["llmMappingStep<br/>結果をまとめる"]

            LLM --> AddMsg --> MapTools --> ExecTools --> Mapping
        end

        Mapping --> Check{"isContinued?"}
        Check -->|true| LLM
        Check -->|false| End["終了"]
    end

イメージ通りの素直なワークフローに見えます。 では、もう少し踏み込んで、Agent Loopの継続条件はどのようになっているのかを見ていきます。

Agent Loopはいつ終わるのか

Mastraにおいて、ループを継続するかどうかは、Vercel AI SDKのStopConditionという仕組みで制御されています。 以下に簡略化されたループ継続判定の実装を示します。

export function createAgenticLoopWorkflow(params) {
  const { ...rest } = params;

  // Track accumulated steps across iterations to pass to stopWhen
  const accumulatedSteps: StepResult[] = [];

  return createWorkflow({ id: 'agentic-loop' })
    .dowhile(agenticExecutionWorkflow, async ({ inputData }) => {
      const typedInputData = inputData as LLMIterationData;
      let hasFinishedSteps = false;

      // 今回のステップ結果を構築してaccumulatedStepsに追加
      const currentStep: StepResult = {
        /* ... */
      };
      accumulatedSteps.push(currentStep);

      if (
        rest.stopWhen &&
        typedInputData.stepResult?.isContinued &&
        accumulatedSteps.length > 0
      ) {
        // 全てのstopConditionを並列で実行する
        const conditions = await Promise.all(
          (Array.isArray(rest.stopWhen) ? rest.stopWhen : [rest.stopWhen]).map(
            (condition) => {
              return condition({ steps: accumulatedSteps });
            }
          )
        );
        // 一つでもtrueの条件があれば、hasFinishedStepsがtrueになる
        hasFinishedSteps = conditions.some((condition) => condition);
      }

      // hasFinishedStepsがtrueの場合だけ、isContinuedをfalseに更新
      if (typedInputData.stepResult) {
        typedInputData.stepResult.isContinued = hasFinishedSteps
          ? false
          : typedInputData.stepResult.isContinued;
      }

      // trueの場合のみループが継続される
      return typedInputData.stepResult?.isContinued ?? false;
    })
    .commit();
}

dowhileメソッドの第二引数に渡すコールバック関数は各stepが終了した後に実行されます。 ループ終了条件stopWhenに該当するかどうかをチェックしています。

ループの終了条件の設定方法について

Agentクラスのメソッドgeneratestreamに引数として、maxStepsstopWhenを指定することができます。

maxStepsはnumber型なので、何回ループするかを指定すれば良いだけです。 stopWhenには、AI SDKで定義されるStopConditionという以下のような型を指定することができ、AI SDKが提供する組み込み条件関数stepCountIs, hasToolCallを使用するか、自作して使うこともできます。(詳しくはこちらをご参照ください)

type StopCondition = (context: { steps: StepResult[] }) => boolean;

Agent Loopに承認を挟む

ここまで、ワークフローの全体像とループの継続判定について見てきました。LLMコールの後にツールが実行されるようなワークフローになっていたかと思います。

でも、例えば、ファイル削除などのリスクを伴うツールはLLMの勝手な判断で実行して欲しくありませんよね。 正しい場面で呼び出されるなら良いですが、間違えて無関係なファイルを削除されて、戻せなくなってしまったら大問題になっちゃいます。

このような事態を避けるため、Mastraにはツールの実行前にユーザーに承認を要求するというような機能があります。 このようにAgent Loopの間に人間が介入することは(ツール実行の承認に限らず)Human-in-the-Loopと呼ばれます。

まず使い方を説明し、そのあとで内部実装について見ていきます。

ツール承認の設定方法

AgentクラスのstreamメソッドやgenerateメソッドにはrequireToolApprovalオプションを渡すことができます。

const result = await agent.stream(messages, {
  requireToolApproval: true, // ツール実行前に承認を求める
});

特定のツールだけに承認を要求することもできます。

const dangerousTool = createTool({
  id: 'delete-file',
  requireApproval: true,  // このツールだけ承認必要
  execute: async (args) => { ... }
});

クライアント側でストリームからtool-call-approvalを待ち受け、承認を要求するUIを表示します。

for await (const chunk of stream) {
  if (chunk.type === 'tool-call-approval') {
    // UIで承認ダイアログを表示
    const { toolCallId, toolName, args } = chunk;
  }
}

クライアントから承認または拒否の判断を受け取り、バックエンドで以下を実行することでAgent Loopを再開することが可能です。

// 承認する場合
await agent.approveToolCall({
  runId: stream.runId,
  toolCallId: toolCallId,
});

// 拒否する場合
await agent.declineToolCall({
  runId: stream.runId,
  toolCallId: toolCallId,
});

簡単ですね。

どのように承認フローを実現しているか

承認フローの核心は、上述のagenticExecutionWorkflowにおけるtoolCallStepにあります。 これはMastraのcreateStepで作成されたワークフローステップで、execute関数の中で承認ロジックが実装されています。

ここで注目すべきはsuspendresumeDataです。これらはワークフローエンジンがexecute関数に注入する引数であり、suspend()を呼ぶとワークフローの状態がストレージに保存され、実行が一時停止します。再開時にはresumeDataに承認結果が入っているので、その値に基づいてツールを実行するかどうかを決定します。

execute: async ({ inputData, suspend, resumeData, requestContext }) => {
  const tool = tools[inputData.toolName];
  const requireToolApproval = requestContext.get(
    '__mastra_requireToolApproval'
  );

  if (requireToolApproval || tool.requireApproval) {
    if (!resumeData) {
      // ===== 初回実行(承認待ち) =====

      // tool-call-approval チャンクを送信
      controller.enqueue({
        type: 'tool-call-approval',
        runId,
        payload: { toolCallId, toolName, args },
      });

      // 会話メッセージを永続化
      await flushMessagesBeforeSuspension();

      // ワークフロー状態を永続化(一時停止)
      return suspend({
        requireToolApproval: { toolCallId, toolName, args },
      });
    } else {
      // ===== 再開時 =====
      if (!resumeData.approved) {
        return { result: 'Tool call was not approved by the user' };
      }
    }
  }

  // ツール実行
  const result = await tool.execute(inputData.args, toolOptions);
  return { result, ...inputData };
};

では、クライアントから承認・拒否が送られた後、どのようにワークフローが再開されるのでしょうか。

approveToolCalldeclineToolCallは内部でresumeStreamを呼び出しています。resumeStreamはストレージからスナップショットを読み込み、{ approved: true/false }を渡してワークフローを再開します。

async approveToolCall(options) {
  return this.resumeStream({ approved: true }, options);
}

async declineToolCall(options) {
  return this.resumeStream({ approved: false }, options);
}

async resumeStream(resumeData, options) {
  const existingSnapshot = await this.#mastra?.getStorage()?.loadWorkflowSnapshot({
    workflowName: 'agentic-loop',
    runId: options.runId,
  });

  return this.#execute({
    resumeContext: { resumeData, snapshot: existingSnapshot },
    methodType: 'stream',
  });
}

このように、承認フローはMastraのワークフローが持つsuspend/resume機能を活用して実現されています。 メッセージとスナップショットが永続化されているため、runIdがあれば、たとえサーバーが再起動したとしても、同じ状態から再開することが可能になっています。

最後に承認フローのシーケンス図を載せておきます。

sequenceDiagram
    participant Client as クライアント
    participant Agent as agent.stream()
    participant ToolStep as toolCallStep
    participant Storage as Storage

    Client->>Agent: stream(messages, { requireToolApproval: true })
    Agent->>ToolStep: LLMがツール呼び出しを生成
    ToolStep->>Client: enqueue({ type: 'tool-call-approval' })
    ToolStep->>Storage: suspend() でスナップショット保存

    Note over Client: ユーザーが承認を選択

    Client->>Agent: approveToolCall({ runId, toolCallId })
    Agent->>Storage: loadWorkflowSnapshot()
    Agent->>ToolStep: resume({ approved: true })
    ToolStep->>ToolStep: tool.execute()
    ToolStep->>Client: ツール結果を返す

さいごに

今回はMastraのAgent Loopの全体像、ループ継続判定、ツール承認の仕組みについて解説しました。 Agent LoopのLLMコールの部分や、Workflowの実装には触れることができていないので、また機会があれば記事にしたいと思います。

最後まで読んでいただきありがとうございました!!