Stack Overflow Asked by Chukwuma Nwaugha on December 16, 2021
I am trying to create a smart Queue that collects data every second or every time i call .next()
and after every x secs, say x=5, it dispatches all the current
items in the queue, while still receiving new items.
The whole idea is to poll the http request to the server every x=5
secs, so that instead of sending these items every sec, a batch of items are sent at a time.
The snippet below is what I have tried. I am accumulating the values with a scan operator, but I need a combination that takes from the queue every x=5 secs.
import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, mergeMap, map, delay, scan } from 'rxjs/operators'
import { Video, seconds } from '../types'
interface Queue {
addMoment: (data: { t: seconds }) => void
unsubscribe: () => void
}
type Moment = {
t: seconds
}
const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000
export class MomentsQueue implements Queue {
private queue = new Subject()
private observer: Observable<any>
private subscriber: Subscription | undefined
constructor(private video: Video) {
this.observer = this.queue.pipe(
map((payload) => {
console.log(payload)
return payload as Moment
}),
scan((all: Moment[], current) => [...all, current], []),
concatMap(
(payload) =>
new Promise((resolve) => {
console.log({ payload })
resolve(true)
}),
),
)
}
private subscribe() {
this.subscriber = this.observer.subscribe()
// ?? this.unsubscribe() // based on any chosen event
}
unsubscribe() {
this.subscriber?.unsubscribe()
}
addMoment(data: Moment) {
if (!this.subscriber || this.subscriber.closed) this.subscribe()
this.queue.next({ t: data.t })
}
}
export default MomentsQueue
I was able to solve this using @TalOhania's help. See the solution in the code snippet below:
import { Subject, Subscription, Observable } from 'rxjs'
import { concatMap, map, bufferTime } from 'rxjs/operators'
import { Video, seconds } from '../types'
interface Queue {
addMoment: (data: { t: seconds }) => void
unsubscribe: () => void
}
type Moment = {
t: seconds
}
const PER_BATCH = 5
const INTERVAL = PER_BATCH * 1000
export class MomentsQueue implements Queue {
private queue = new Subject()
private observer: Observable<any>
private subscriber: Subscription | undefined
constructor(private video: Video) {
this.observer = this.queue.pipe(
map((payload) => payload as Moment),
bufferTime(INTERVAL),
concatMap(
(payload) =>
new Promise((resolve) => {
// send the payload to the server.
console.log({ payload })
resolve(true)
}),
),
)
}
private subscribe() {
this.subscriber = this.observer.subscribe()
// ?? this.unsubscribe() // based on any chosen event
}
unsubscribe() {
this.subscriber?.unsubscribe()
}
addMoment(data: Moment) {
if (!this.subscriber || this.subscriber.closed) this.subscribe()
this.queue.next({ t: data.t })
}
}
export default MomentsQueue
Answered by Chukwuma Nwaugha on December 16, 2021
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP