カスタムフィード作成

Blueskyのようなアプリからアクセス可能なアルゴリズムのホスト

カスタムフィード(フィードジェネレーター)は、ユーザーにアルゴリズムを提供するサービスです。これにより、自分好みのタイムライン表示を選んだり、他のユーザー向けに埋め込み可能なフィードを作成したりといった高い自由度が得られます。

このチュートリアルでは、firehoseから投稿を取り込み、アルゴリズムで並び替え、XRPCフィードとして配信することで、カスタムフィードサーバーを作成します。

前提条件

このチュートリアルでは、TypeScriptアプリケーションをゼロから構築します。TypeScriptとNode.jsの基本的な理解を前提としています。

以下をインストールしておいてください。

  • Node.js 18以上
  • Go 1.25以上

homebrewに対応したプラットフォームでは、次のコマンドでインストールできます。

brew install node go

さらに、lex CLIツールをグローバルにインストールしておく必要があります。

npm install -g @atproto/lex tsx

ネットワーク上の投稿をfirehoseで取り込むために、tapのインストールも必要です。

go install github.com/bluesky-social/indigo/cmd/tap

次に、新しいTypeScriptプロジェクトディレクトリを作成し、プロジェクトを初期化します。

mkdir my-agent
cd my-agent
npm init -y
npm i -D typescript @types/node dotenv @atproto/lex @atproto/lex-server @atproto/tap
npx tsc --init --verbatimModuleSyntax false

まず、プロジェクトにLexiconを追加します。

パート1:LexionとTap

Lexiconはatprotoのレコードを定義します。基本的なフィードジェネレーターにはapp.bsky.feed.describeFeedGeneratorapp.bsky.feed.getFeedSkeletonのLexiconが必要です。lexを使えば、これらのLexiconをダウンロードしてプロジェクトに組み込めます。

lex install app.bsky.feed.describeFeedGenerator app.bsky.feed.getFeedSkeleton

ここからlex buildを実行すると、インストール済みLexiconすべてに対してTypeScriptの型を生成できます。

lex build

生成されたコードはsrc/lexiconsに配置されます。

ネットワーク上の投稿をインデックスするために、別ターミナルでtapを実行する必要があります。次のコマンドで開始します。

tap run --disable-acks=true

これでフィードジェネレーターの構築を始められます。

パート2:フィードの構成

ユーザーにカスタムアルゴリズムを提供する一般的なフローは以下の通りです。

  • ユーザーがアプリケーションバックエンド(例:Bluesky)にフィードを要求する
  • アプリケーションが要求されたフィードのDIDドキュメントを見つけ、ホスティングサーバーを特定する
  • アプリケーションがホスティングサーバーへgetFeedSkeletonリクエストを送る
  • フィードのホスティングサーバーが投稿URLの一覧を返す
  • アプリケーションがフィードをハイドレート(ユーザー情報、投稿内容、集計情報などを付加)する
  • アプリケーションがハイドレート済みフィードをユーザーに返す

最初に、環境変数から設定を読み取ります。多くは公開するフィード用の設定(FEED_PUBLISHER_DID, FEED_NAME)ですが、フィードの挙動を決める設定(FEED_MAX_POSTS, SEARCH_TERMS)も含まれます。

この例では、アプリ全体をsrc/index.tsという単一ファイルで作成します。このファイルを作成し、以下のimportと設定行を追加してください。

import { AtUriString, DidString, asDidString } from '@atproto/lex'
import { LexError, LexRouter, serviceAuth } from '@atproto/lex-server'
import { serve } from '@atproto/lex-server/nodejs'
import { Tap, SimpleIndexer } from '@atproto/tap'
import * as app from './lexicons/app.js'

// =============================================================================
// Configuration
// =============================================================================

interface FeedConfig {
    publisherDid: DidString
    feedName: string
    searchTerms: string[]
    maxPosts: number
    port: number
    tapUrl: string
    tapPassword: string
    initialRepos: string[]
}

const DEFAULT_REPO = 'did:plc:ragtjsm2j2vknwkz3zp4oxrd' // pfrazee.com

const config: FeedConfig = {
    publisherDid: asDidString(process.env.FEED_PUBLISHER_DID || 'did:example:alice'),
    feedName: process.env.FEED_NAME || 'whats-alf',
    searchTerms: (process.env.FEED_SEARCH_TERMS || 'alf').split(',').map(s => s.trim()),
    maxPosts: parseInt(process.env.FEED_MAX_POSTS || '1000', 10),
    port: parseInt(process.env.FEED_PORT || '3000', 10),
    tapUrl: process.env.TAP_URL || 'http://localhost:2480',
    tapPassword: process.env.TAP_PASSWORD || 'secret',
    initialRepos: (process.env.FEED_INITIAL_REPOS || DEFAULT_REPO).split(',').map(s => s.trim()),
}

const FEED_URI: AtUriString = `at://${config.publisherDid}/app.bsky.feed.generator/${config.feedName}` as AtUriString

いくつか注目すべき点があります。tapで取得する対象としてDEFAULT_REPOを指定しています。この例では一人のユーザーとしてpfrazee.comを追跡します。これにより、チュートリアルの例を扱いやすくしています。

このフィードは、特定の語句を含む投稿を検索して並び替えます。語句はFEED_SEARCH_TERMS環境変数で設定できます。この例では"alf"に言及する投稿を検索します。

次に、firehoseから投稿を取り込むロジックを設定します。

パート3:firehose取り込み

配信対象の投稿は配列に保存します。

tapがネットワーク上の投稿を検出すると、このフィードサーバーは投稿テキストを設定した検索語で検索します。一致した場合は投稿を配列に追加します。以下の内容をsrc/index.tsに追加してください。

// =============================================================================
// Post Index
// =============================================================================

interface IndexedPost {
    uri: AtUriString
    indexedAt: number
}

const postIndex: IndexedPost[] = []

// =============================================================================
// Tap Indexer
// =============================================================================

const tap = new Tap(config.tapUrl, { adminPassword: config.tapPassword })
const indexer = new SimpleIndexer()

const searchPattern = new RegExp(
    `\\b(${config.searchTerms.map(t => t.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')).join('|')})\\b`,
    'i'
)

indexer.record(async (evt) => {
    if (evt.collection !== 'app.bsky.feed.post') return

    const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}` as AtUriString

    if (evt.action === 'delete') {
        const idx = postIndex.findIndex(p => p.uri === uri)
        if (idx !== -1) {
            postIndex.splice(idx, 1)
            console.log(`DELETE ${uri}`)
        }
        return
    }

    const text = (evt.record?.text as string) || ''
    if (!searchPattern.test(text)) return
    if (postIndex.some(p => p.uri === uri)) return

    postIndex.unshift({ uri, indexedAt: Date.now() })
    if (postIndex.length > config.maxPosts) postIndex.pop()

    const preview = text.substring(0, 60).replace(/\n/g, ' ')
    console.log(`${evt.action.toUpperCase()} ${uri}`)
    console.log(`  "${preview}${text.length > 60 ? '...' : ''}"`)
    console.log(`  ⭐ Added to index (${postIndex.length} total)`)
})

indexer.identity(async (evt) => {
    if (evt.status === 'active') return
    // Remove posts from disabled/deleted identities
    const removed = postIndex.filter(p => p.uri.includes(evt.did)).length
    if (removed > 0) {
        postIndex.splice(0, postIndex.length, ...postIndex.filter(p => !p.uri.includes(evt.did)))
        console.log(`Identity ${evt.did} (${evt.status}): removed ${removed} posts`)
    }
})

indexer.error((err) => console.error('Indexer error:', err))

const channel = tap.channel(indexer)

ここではtapに接続し、@atproto/tapライブラリのSimpleIndexer関数を使ってapp.bsky.feed.post Lexicon、つまりBluesky投稿の受信レコードを処理しています。

新しい投稿が見つかったら、テキストが検索語に一致するか確認します。一致した場合はpostIndex配列に追加します。さらに削除とIDステータス変更イベントも処理し、インデックスの正確性を保ちます。

要するに、条件に一致する投稿のインメモリ検索インデックスを構築し、それをフィードとして配信します。

パート4:フィード配信

次に、フィードを配信するためのXRPCルートを追加します。サーバーは次の2種類のリクエストを扱います。

GET /xrpc/app.bsky.feed.describeFeedGenerator
GET /xrpc/app.bsky.feed.getFeedSkeleton

src/index.tsにフィードジェネレーターサーバーのロジックを追加します。

// =============================================================================
// Feed Generator Server
// =============================================================================

// Auth is optional for this demo since we only log the requester's DID.
// In production, you may want to use credentials to personalize the feed.
const auth = serviceAuth({
    audience: config.publisherDid,
    unique: async () => true,
})

const router = new LexRouter()

router.add(app.bsky.feed.describeFeedGenerator, {
    auth,
    handler: (ctx) => {
        console.log('describeFeedGenerator from', ctx.credentials?.did)
        return {
            body: {
                did: config.publisherDid,
                feeds: [app.bsky.feed.describeFeedGenerator.feed.$build({ uri: FEED_URI })],
                links: {
                    privacyPolicy: 'https://example.com/privacy',
                    termsOfService: 'https://example.com/tos',
                },
            },
        }
    },
})

router.add(app.bsky.feed.getFeedSkeleton, {
    auth,
    handler: (ctx) => {
        if (ctx.params.feed !== FEED_URI) {
            throw new LexError('InvalidRequest', 'Feed not found')
        }
        console.log('getFeedSkeleton from', ctx.credentials?.did)

        const limit = Math.min(ctx.params.limit ?? 50, 100)
        const cursor = ctx.params.cursor as string | undefined

        let startIdx = 0
        if (cursor) {
            const cursorTime = parseInt(cursor, 10)
            startIdx = postIndex.findIndex(p => p.indexedAt < cursorTime)
            if (startIdx === -1) startIdx = postIndex.length
        }

        const slice = postIndex.slice(startIdx, startIdx + limit)
        const feed = slice.map(p => app.bsky.feed.defs.skeletonFeedPost.$build({ post: p.uri }))
        const lastPost = slice.at(-1)
        const nextCursor = lastPost && slice.length === limit && startIdx + limit < postIndex.length
            ? lastPost.indexedAt.toString()
            : undefined

        return { body: { feed, cursor: nextCursor } }
    },
})

ここでは@atproto/lex-serverパッケージのLexRouterを使ってXRPCルートを定義しています。getFeedSkeletonメソッドはレスポンスでcursorを返し、入力としてもcursorパラメータを受け取ります。これによりクライアントはフィードをページネーションできます。

最後に、サーバーを起動するためのランタイムロジックを追加します。

パート5:サーバー起動

channel.start()でtapに接続してからサーバーを起動します。以下のランタイムロジックをsrc/index.tsに追加してください。

// =============================================================================
// Start
// =============================================================================

channel.start()
console.log('Indexer connected to Tap server')

if (config.initialRepos.length > 0) {
    tap.addRepos(config.initialRepos).then(() => {
        console.log(`Added ${config.initialRepos.length} repo(s) to follow\n`)
    })
}

serve(router, { port: config.port }).then((server) => {
    const feedParam = encodeURIComponent(FEED_URI)

    console.log(`
Feed Generator Running

Server: http://localhost:${config.port}
Feed: ${config.feedName}
Terms: ${config.searchTerms.join(', ')}
Tap: ${config.tapUrl}
Repos: ${config.initialRepos.length}

To test (generate a JWT with goat):
goat account service-auth --aud ${config.publisherDid}

Then:
curl -H "Authorization: Bearer <jwt>" "http://localhost:${config.port}/xrpc/app.bsky.feed.getFeedSkeleton?feed=${feedParam}"

Listening for posts matching: ${config.searchTerms.join(', ')}
`)

    const shutdown = async () => {
        console.log('Shutting down...')
        await channel.destroy()
        await server.terminate()
        process.exit(0)
    }

    process.on('SIGINT', shutdown)
    process.on('SIGTERM', shutdown)
})

これでtapへの接続(フィード入力)とXRPCサーバー(フィード出力)の両方が開始されます。フィードジェネレーターのテストに役立つログ出力も備えています。

npx tsx src/index.tsで実行します。初期投稿がインデックスされる様子が表示されるはずです。

Indexer connected to Tap server

Feed Generator Running

Server: http://localhost:3000
Feed: whats-alf
Terms: alf
Tap: http://localhost:2480
Repos: 1

To test (generate a JWT with goat):
goat account service-auth --aud did:example:alice

Then:
curl -H "Authorization: Bearer " "http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at%3A%2F%2Fdid%3Aexample%3Aalice%2Fapp.bsky.feed.generator%2Fwhats-alf"

Listening for posts matching: alf

Added 1 repo(s) to follow

CREATE at://did:plc:ragtjsm2j2vknwkz3zp4oxrd/app.bsky.feed.post/3jux6xlrdb42v
  "Run?? Alf was the date I struck out with ☹️"
  ⭐ Added to index (1 total)
CREATE at://did:plc:ragtjsm2j2vknwkz3zp4oxrd/app.bsky.feed.post/3jux7x2uvip2v
  "god is that okay? I'm really confused right now, Alf broke m..."
  ⭐ Added to index (2 total)

結論

これで、ネットワークから投稿を取得し、独自アルゴリズムでインデックスし、BlueskyのようなクライアントへXRPC経由で配信するカスタムatprotoフィードを構築できました。

フィードサーバーが動作したので、さらにカスタマイズして他の人が使えるように公開できます。そのためには、公開サーバー(例:AWS、DigitalOceanなど)へデプロイし、HTTPSでアクセス可能にしてください。

その後、公式のfeed-generatorリポジトリをクローンし、publishFeedGen.tsスクリプトを対話的に実行できます。これにより、あなたのフィードサーバーを指すために必要なatprotoレコードが作成され、発見可能になります。

さらに多くのガイドやチュートリアルはGuidesセクションで、より多くのサンプルアプリはCookbookリポジトリで確認できます。ぜひ開発を楽しんでください。