カスタムフィード(フィードジェネレーター)は、ユーザーにアルゴリズムを提供するサービスです。これにより、自分好みのタイムライン表示を選んだり、他のユーザー向けに埋め込み可能なフィードを作成したりといった高い自由度が得られます。
このチュートリアルでは、firehoseから投稿を取り込み、アルゴリズムで並び替え、XRPCフィードとして配信することで、カスタムフィードサーバーを作成します。
前提条件
このチュートリアルでは、TypeScriptアプリケーションをゼロから構築します。TypeScriptとNode.jsの基本的な理解を前提としています。
以下をインストールしておいてください。
- Node.js 18以上
- Go 1.25以上
homebrewに対応したプラットフォームでは、次のコマンドでインストールできます。
brew install node go
さらに、lex CLIツールをグローバルにインストールしておく必要があります。
npm install -g @atproto/lex tsx
ネットワーク上の投稿をfirehoseで取り込むために、tapのインストールも必要です。
go install github.com/bluesky-social/indigo/cmd/tap
次に、新しいTypeScriptプロジェクトディレクトリを作成し、プロジェクトを初期化します。
mkdir my-agent
cd my-agent
npm init -y
npm i -D typescript @types/node dotenv @atproto/lex @atproto/lex-server @atproto/tap
npx tsc --init --verbatimModuleSyntax false
まず、プロジェクトにLexiconを追加します。
パート1:LexionとTap
Lexiconはatprotoのレコードを定義します。基本的なフィードジェネレーターにはapp.bsky.feed.describeFeedGeneratorとapp.bsky.feed.getFeedSkeletonのLexiconが必要です。lexを使えば、これらのLexiconをダウンロードしてプロジェクトに組み込めます。
lex install app.bsky.feed.describeFeedGenerator app.bsky.feed.getFeedSkeleton
ここからlex buildを実行すると、インストール済みLexiconすべてに対してTypeScriptの型を生成できます。
lex build
生成されたコードはsrc/lexiconsに配置されます。
ネットワーク上の投稿をインデックスするために、別ターミナルでtapを実行する必要があります。次のコマンドで開始します。
tap run --disable-acks=true
これでフィードジェネレーターの構築を始められます。
パート2:フィードの構成
ユーザーにカスタムアルゴリズムを提供する一般的なフローは以下の通りです。
- ユーザーがアプリケーションバックエンド(例:Bluesky)にフィードを要求する
- アプリケーションが要求されたフィードのDIDドキュメントを見つけ、ホスティングサーバーを特定する
- アプリケーションがホスティングサーバーへ
getFeedSkeletonリクエストを送る - フィードのホスティングサーバーが投稿URLの一覧を返す
- アプリケーションがフィードをハイドレート(ユーザー情報、投稿内容、集計情報などを付加)する
- アプリケーションがハイドレート済みフィードをユーザーに返す
最初に、環境変数から設定を読み取ります。多くは公開するフィード用の設定(FEED_PUBLISHER_DID, FEED_NAME)ですが、フィードの挙動を決める設定(FEED_MAX_POSTS, SEARCH_TERMS)も含まれます。
この例では、アプリ全体をsrc/index.tsという単一ファイルで作成します。このファイルを作成し、以下のimportと設定行を追加してください。
import { AtUriString, DidString, asDidString } from '@atproto/lex'
import { LexError, LexRouter, serviceAuth } from '@atproto/lex-server'
import { serve } from '@atproto/lex-server/nodejs'
import { Tap, SimpleIndexer } from '@atproto/tap'
import * as app from './lexicons/app.js'
// =============================================================================
// Configuration
// =============================================================================
interface FeedConfig {
publisherDid: DidString
feedName: string
searchTerms: string[]
maxPosts: number
port: number
tapUrl: string
tapPassword: string
initialRepos: string[]
}
const DEFAULT_REPO = 'did:plc:ragtjsm2j2vknwkz3zp4oxrd' // pfrazee.com
const config: FeedConfig = {
publisherDid: asDidString(process.env.FEED_PUBLISHER_DID || 'did:example:alice'),
feedName: process.env.FEED_NAME || 'whats-alf',
searchTerms: (process.env.FEED_SEARCH_TERMS || 'alf').split(',').map(s => s.trim()),
maxPosts: parseInt(process.env.FEED_MAX_POSTS || '1000', 10),
port: parseInt(process.env.FEED_PORT || '3000', 10),
tapUrl: process.env.TAP_URL || 'http://localhost:2480',
tapPassword: process.env.TAP_PASSWORD || 'secret',
initialRepos: (process.env.FEED_INITIAL_REPOS || DEFAULT_REPO).split(',').map(s => s.trim()),
}
const FEED_URI: AtUriString = `at://${config.publisherDid}/app.bsky.feed.generator/${config.feedName}` as AtUriString
いくつか注目すべき点があります。tapで取得する対象としてDEFAULT_REPOを指定しています。この例では一人のユーザーとしてpfrazee.comを追跡します。これにより、チュートリアルの例を扱いやすくしています。
このフィードは、特定の語句を含む投稿を検索して並び替えます。語句はFEED_SEARCH_TERMS環境変数で設定できます。この例では"alf"に言及する投稿を検索します。
次に、firehoseから投稿を取り込むロジックを設定します。
パート3:firehose取り込み
配信対象の投稿は配列に保存します。
tapがネットワーク上の投稿を検出すると、このフィードサーバーは投稿テキストを設定した検索語で検索します。一致した場合は投稿を配列に追加します。以下の内容をsrc/index.tsに追加してください。
// =============================================================================
// Post Index
// =============================================================================
interface IndexedPost {
uri: AtUriString
indexedAt: number
}
const postIndex: IndexedPost[] = []
// =============================================================================
// Tap Indexer
// =============================================================================
const tap = new Tap(config.tapUrl, { adminPassword: config.tapPassword })
const indexer = new SimpleIndexer()
const searchPattern = new RegExp(
`\\b(${config.searchTerms.map(t => t.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')).join('|')})\\b`,
'i'
)
indexer.record(async (evt) => {
if (evt.collection !== 'app.bsky.feed.post') return
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}` as AtUriString
if (evt.action === 'delete') {
const idx = postIndex.findIndex(p => p.uri === uri)
if (idx !== -1) {
postIndex.splice(idx, 1)
console.log(`DELETE ${uri}`)
}
return
}
const text = (evt.record?.text as string) || ''
if (!searchPattern.test(text)) return
if (postIndex.some(p => p.uri === uri)) return
postIndex.unshift({ uri, indexedAt: Date.now() })
if (postIndex.length > config.maxPosts) postIndex.pop()
const preview = text.substring(0, 60).replace(/\n/g, ' ')
console.log(`${evt.action.toUpperCase()} ${uri}`)
console.log(` "${preview}${text.length > 60 ? '...' : ''}"`)
console.log(` ⭐ Added to index (${postIndex.length} total)`)
})
indexer.identity(async (evt) => {
if (evt.status === 'active') return
// Remove posts from disabled/deleted identities
const removed = postIndex.filter(p => p.uri.includes(evt.did)).length
if (removed > 0) {
postIndex.splice(0, postIndex.length, ...postIndex.filter(p => !p.uri.includes(evt.did)))
console.log(`Identity ${evt.did} (${evt.status}): removed ${removed} posts`)
}
})
indexer.error((err) => console.error('Indexer error:', err))
const channel = tap.channel(indexer)
ここではtapに接続し、@atproto/tapライブラリのSimpleIndexer関数を使ってapp.bsky.feed.post Lexicon、つまりBluesky投稿の受信レコードを処理しています。
新しい投稿が見つかったら、テキストが検索語に一致するか確認します。一致した場合はpostIndex配列に追加します。さらに削除とIDステータス変更イベントも処理し、インデックスの正確性を保ちます。
要するに、条件に一致する投稿のインメモリ検索インデックスを構築し、それをフィードとして配信します。
パート4:フィード配信
次に、フィードを配信するためのXRPCルートを追加します。サーバーは次の2種類のリクエストを扱います。
GET /xrpc/app.bsky.feed.describeFeedGenerator
GET /xrpc/app.bsky.feed.getFeedSkeleton
src/index.tsにフィードジェネレーターサーバーのロジックを追加します。
// =============================================================================
// Feed Generator Server
// =============================================================================
// Auth is optional for this demo since we only log the requester's DID.
// In production, you may want to use credentials to personalize the feed.
const auth = serviceAuth({
audience: config.publisherDid,
unique: async () => true,
})
const router = new LexRouter()
router.add(app.bsky.feed.describeFeedGenerator, {
auth,
handler: (ctx) => {
console.log('describeFeedGenerator from', ctx.credentials?.did)
return {
body: {
did: config.publisherDid,
feeds: [app.bsky.feed.describeFeedGenerator.feed.$build({ uri: FEED_URI })],
links: {
privacyPolicy: 'https://example.com/privacy',
termsOfService: 'https://example.com/tos',
},
},
}
},
})
router.add(app.bsky.feed.getFeedSkeleton, {
auth,
handler: (ctx) => {
if (ctx.params.feed !== FEED_URI) {
throw new LexError('InvalidRequest', 'Feed not found')
}
console.log('getFeedSkeleton from', ctx.credentials?.did)
const limit = Math.min(ctx.params.limit ?? 50, 100)
const cursor = ctx.params.cursor as string | undefined
let startIdx = 0
if (cursor) {
const cursorTime = parseInt(cursor, 10)
startIdx = postIndex.findIndex(p => p.indexedAt < cursorTime)
if (startIdx === -1) startIdx = postIndex.length
}
const slice = postIndex.slice(startIdx, startIdx + limit)
const feed = slice.map(p => app.bsky.feed.defs.skeletonFeedPost.$build({ post: p.uri }))
const lastPost = slice.at(-1)
const nextCursor = lastPost && slice.length === limit && startIdx + limit < postIndex.length
? lastPost.indexedAt.toString()
: undefined
return { body: { feed, cursor: nextCursor } }
},
})
ここでは@atproto/lex-serverパッケージのLexRouterを使ってXRPCルートを定義しています。getFeedSkeletonメソッドはレスポンスでcursorを返し、入力としてもcursorパラメータを受け取ります。これによりクライアントはフィードをページネーションできます。
このカーソルは不透明な値として扱われ、実装はフィードジェネレーター側に完全に委ねられます。AppViewはこの値をクライアントとの間でそのまま中継します。ページネーション時の予期しない挙動を防ぐため、カーソルは各フィード項目に対して一意である必要があります。例えば、次のようなタイムスタンプとCIDを組み合わせたカーソルです。
1683654690921::bafyreia3tbsfxe3cc75xrxyyn6qc42oupi73fxiox76prlyi5bpx7hr72u
最後に、サーバーを起動するためのランタイムロジックを追加します。
パート5:サーバー起動
channel.start()でtapに接続してからサーバーを起動します。以下のランタイムロジックをsrc/index.tsに追加してください。
// =============================================================================
// Start
// =============================================================================
channel.start()
console.log('Indexer connected to Tap server')
if (config.initialRepos.length > 0) {
tap.addRepos(config.initialRepos).then(() => {
console.log(`Added ${config.initialRepos.length} repo(s) to follow\n`)
})
}
serve(router, { port: config.port }).then((server) => {
const feedParam = encodeURIComponent(FEED_URI)
console.log(`
Feed Generator Running
Server: http://localhost:${config.port}
Feed: ${config.feedName}
Terms: ${config.searchTerms.join(', ')}
Tap: ${config.tapUrl}
Repos: ${config.initialRepos.length}
To test (generate a JWT with goat):
goat account service-auth --aud ${config.publisherDid}
Then:
curl -H "Authorization: Bearer <jwt>" "http://localhost:${config.port}/xrpc/app.bsky.feed.getFeedSkeleton?feed=${feedParam}"
Listening for posts matching: ${config.searchTerms.join(', ')}
`)
const shutdown = async () => {
console.log('Shutting down...')
await channel.destroy()
await server.terminate()
process.exit(0)
}
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)
})
これでtapへの接続(フィード入力)とXRPCサーバー(フィード出力)の両方が開始されます。フィードジェネレーターのテストに役立つログ出力も備えています。
npx tsx src/index.tsで実行します。初期投稿がインデックスされる様子が表示されるはずです。
Indexer connected to Tap server
Feed Generator Running
Server: http://localhost:3000
Feed: whats-alf
Terms: alf
Tap: http://localhost:2480
Repos: 1
To test (generate a JWT with goat):
goat account service-auth --aud did:example:alice
Then:
curl -H "Authorization: Bearer " "http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at%3A%2F%2Fdid%3Aexample%3Aalice%2Fapp.bsky.feed.generator%2Fwhats-alf"
Listening for posts matching: alf
Added 1 repo(s) to follow
CREATE at://did:plc:ragtjsm2j2vknwkz3zp4oxrd/app.bsky.feed.post/3jux6xlrdb42v
"Run?? Alf was the date I struck out with ☹️"
⭐ Added to index (1 total)
CREATE at://did:plc:ragtjsm2j2vknwkz3zp4oxrd/app.bsky.feed.post/3jux7x2uvip2v
"god is that okay? I'm really confused right now, Alf broke m..."
⭐ Added to index (2 total)
この例では単一ユーザーのデータリポジトリだけを取得している点に注意してください。ネットワーク全体を取得するには、tapを停止して--signal-collectionフラグ付きで再実行します。
tap run --collection-filters=app.bsky.feed.post \
--signal-collection=app.bsky.feed.post
結論
これで、ネットワークから投稿を取得し、独自アルゴリズムでインデックスし、BlueskyのようなクライアントへXRPC経由で配信するカスタムatprotoフィードを構築できました。
フィードサーバーが動作したので、さらにカスタマイズして他の人が使えるように公開できます。そのためには、公開サーバー(例:AWS、DigitalOceanなど)へデプロイし、HTTPSでアクセス可能にしてください。
その後、公式のfeed-generatorリポジトリをクローンし、publishFeedGen.tsスクリプトを対話的に実行できます。これにより、あなたのフィードサーバーを指すために必要なatprotoレコードが作成され、発見可能になります。
さらに多くのガイドやチュートリアルはGuidesセクションで、より多くのサンプルアプリはCookbookリポジトリで確認できます。ぜひ開発を楽しんでください。