hiro99ma blog

rust: tokio::broadcast と tokio::mpsc の recv はちょっと違う

tokio::spawn() したスレッド間の通知に tokio::mpsc::channel を使っていた。
その後、ブロードキャストしたいシーンがあって tokio::broadcast を追加したのだが、 tx.send() を実行しているのに rx.recv().await が全然動作しない。
どちらも常駐していて loop {} させていて、tx.send() する方は thread::sleep() で 10秒おきくらいに送信していた。

Gemini氏に聞いてみると send 後に tokio::task::yield_now() するとよかろうと。
やってみると、確かに recv.await が解除されて進んだ。
えー、なんでー。

tokio::spawn() は Linuxでいうスレッドではない

なんとなくそうじゃないかと思っていたのだが、違うそうだ。
Go でいうところの goroutine、いわゆる coroutine というのかな?
軽量スレッドというところか(昔はスレッドが”軽量プロセス”って呼び方だった。pthreadではなかったかも。)。

だから、というわけでもないかもしれないが thread::sleep() したからといってスレッドが切り替えされるわけではないそうだ。 send と recv のタスクが同じスレッドで動いていたら、いくらスリープしても切り替わる効果はない(同じスレッド全部が止まる)。 tokio::task::yield_now() は tokio のスケジューラに処理を返すので、今回のような場合に効果がある。

tokio::mpsc ではそういうことを気にしなかったのだが、これは tx に対して rx が遅いと tx がサスペンドするらしい。 “Backpressure(背圧制御)” というらしい。

・・・と Gemini氏は言う。
さんざん AI にだまされてきたので信用しづらいが、少なくとも理屈は合いそうだし、実際動いた。
tokio::mpsc には “backpressure” とは書いてあるが、 それが処理の切り替えどうこうとは書いていない。容量が上限に達したらとかそういう文脈で書いてある。

“backpressure” は、処理能力を超えたデータが送られてきたときに送信元に対して流量制限を掛けるしくみや圧力をいうらしい。

tokio::broadcast からは読み取れないなあ。

ChatGPT 氏には thread::sleep() だとスレッド自体ブロックするから tokio::time::sleep().await; にしなされ、という提案をもらった。
yield_now() とどちらがよいかは中身次第だが、tokio のものは tokio に返しなさい、というところか。

tokio-console

alloy という Ethereum 系のネットワークにアクセスするライブラリを使っている。
コマンドライン引数で単独の処理を動かして終わり、という場合には特に問題なかったのだが、 tokio::spawn() で常駐した処理から実行すると正しく動かないことがあった。
常に失敗するというわけではなく、RUST_LOG でログを有効にしているとほぼ動いていて、なんとなくログを外したら動かない、という嫌な感じ。
mutex で排他しているので間違っていなければ競合の問題ではないと思う。

おそらく、おそらくなのだが、alloy の API を呼び出してから戻ってくるまでの処理とこっちでやっている処理のタイミングが悪いとか そういうやつじゃないのかなあ。

せめて、tokio の使い方なのかそうじゃないのかは切り分けたい。
外側からモニタリングするツールが提供されている。

これでインストールした tokio-console を実行すると 6669番と通信してタスクの状況などが表示される。
⚠️表示の横の数字は warning の数で、⚠️1 の脚注みたいな意味ではなさそう。

tokio::spawn() だけだと名前が出てこずわからないので tokio::task::Builder::new().name("名前").spawn()? のようにするのがよさそう。
Grok には「spawn_named() でいけるッスよ」とだまされたがね。。。
今まで tokio::spawn() でタスクを動かしていたのだが、spawntokio::task::spawn() が本物?
でも tokio のサンプルコードでも tokio::spawn() だな。

実際に困っている状況を見ていると、Alloy の API を呼び出している自分のタスクと alloy-transport というタスク(私が立ち上げたわけではない)の Busy だけが増えていっている。
何をしているのかわからないが、他のタスクは Busy がほぼ変化していないので tokio 関連かどうかは別として他のタスクが立ちふさがっているというわけではなさそうだ。
そのくらいでも助かる。

Global Subscriberがぶつかる

console_subscriber::init(); を使う前に tracing_subscriber::fmt()::init(); がすでにあった。
何となく console_subscriber の方を先に書いていたのだが、それによって Global Subscriber が設定されてしまい trasing_subscriber の方が設定できなくなってしまった。

こんな感じでいけるようだ。

    tracing_subscriber::Registry::default()
        .with(console_subscriber::spawn())
        .with(tracing_subscriber::fmt::layer()
            .with_ansi(false)
            .with_filter(LevelFilter::DEBUG)
        )
        .init();
writer: hiro99ma
tags: Rust言語

 
About me
About me
comment
Comment Form
🏠
Top page
GitHub
GitHub
Twitter
X/Twitter
My page
Homepage