handleEvent to async to deal with increased firehose volume (#81)

* speed up subscription by calling handleEvent async

Moving error handling to src/subscription, this will allow handleEvent to occur in parallel

* Error handling in handleEvent

Verifying getOpsByType within the function now that handleEvent is managed async

* simplify exception handling

* dependency updates

* remove semis

---------

Co-authored-by: Hailey <hailey@blueskyweb.xyz>
Co-authored-by: Daniel Holmgren <dtholmgren@gmail.com>
This commit is contained in:
Bossett 2024-11-01 08:36:17 +11:00 committed by GitHub
parent 5f2d936be2
commit dcaa673ce2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 761 additions and 399 deletions

View file

@ -7,7 +7,13 @@ import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
export class FirehoseSubscription extends FirehoseSubscriptionBase { export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) { async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return if (!isCommit(evt)) return
const ops = await getOpsByType(evt)
const ops = await getOpsByType(evt).catch(e => {
console.error('repo subscription could not handle message', e)
return undefined
})
if (!ops) return
// This logs the text of every post off the firehose. // This logs the text of every post off the firehose.
// Just for fun :) // Just for fun :)

View file

@ -39,11 +39,7 @@ export abstract class FirehoseSubscriptionBase {
async run(subscriptionReconnectDelay: number) { async run(subscriptionReconnectDelay: number) {
try { try {
for await (const evt of this.sub) { for await (const evt of this.sub) {
try { this.handleEvent(evt)
await this.handleEvent(evt)
} catch (err) {
console.error('repo subscription could not handle message', err)
}
// update stored cursor every 20 events or so // update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) { if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq) await this.updateCursor(evt.seq)

1146
yarn.lock

File diff suppressed because it is too large Load diff