0:00
/
0:00
Transcript

Realtime Producer-Consumer in Rust

Using channels to feed data in realtime to a consumer (for audio or video)

In this video I’m setting up a producer-consumer that can handle audio or video data, where the rate at which the producer creates data is constrained by the rate at which the consumer consumes and processes data.

You can find the full source code here: https://github.com/caveofprogramming/rust/

To do this I’m implementing a data block recycling mechanism, so that after some initial buffering, the consumer passes used data blocks back to the producer, which fills in new data and adds them to the queue as soon as the consumer has read more data from the queue.

Here I’m using crossbeam 0.8, so my cargo.toml has this in it:

My imports look like this:

I’m using a bounded crossbeam channel. This is a channel which will only accept a specified number of data blocks. This is what constrains the speed of my producer; the producer cannot add new data blocks into the channel, once the channel’s full, until the consumer reads a block from the channel.

I’m using Duration here just to limit the speed of the consumer, to simulate processing video or audio at a fixed pace.

Then I have some definitions:

BLOCK_SIZE is the size of a single block of data. More typically it might be 512, 1024 or whatever.

N_BLOCKS is the size of the crossbeam channel.

Then BLOCK is the block definition itself. A block in this case is an array of BLOCK_SIZE elements, of type f64 (but you can use whatever type is useful). I wrap this in a Box smart pointer just so that these blocks exist on the heap, not the stack, and the channel actually contains references to them.

My two channels are created like this:

I have one channel for sending data to the consumer from the producer, and a second for ‘recycling’ the blocks by sending them from the consumer back to the producer.

In the producer I use send() to send the blocks and recv() to receive them. These methods both wait until (in the case of send()) there is space on the queue to send, or until (in the case of recv()) a block is available to be received.

The consumer is as follows. Now I’m using try_recv() to receive data. This returns immediately and will return an error if there’s no data to receive, so I’m relying on the producer successfully keeping up.

Then I use send to return the data block back to the producer. I think this should be OK, but we could also consider making the return queue unbounded (or just really large) to avoid any possible delay.

Finally, we wait for the threads to finish.

The output from running the program looks like this:

Discussion about this video

User's avatar

Ready for more?