4 Tutorials

4.1 Application tutorial  

In this tutorial, we will write an application that builds a pipeline featuring a file source, a TS demux, an avcodec decoder, an avcodec JPEG encoder, and a file sink. The goal of the application is to extract the first picture off of a TS file, and write it to the disk as a JPEG. The full source code is in the examples/extract_pic.c file.

Upipe environment  

First we start with setting up the global structures required by most Upipe modules:

struct ev_loop *loop = ev_default_loop(0);
struct upump_mgr *upump_mgr = upump_ev_mgr_alloc(loop, UPUMP_POOL, UPUMP_BLOCKER_POOL);
struct umem_mgr *umem_mgr = umem_alloc_mgr_alloc();
struct udict_mgr *udict_mgr = udict_inline_mgr_alloc(UDICT_POOL_DEPTH, umem_mgr, -1, -1);
struct uref_mgr *uref_mgr = uref_std_mgr_alloc(UREF_POOL_DEPTH, udict_mgr, 0);

The first two lines establish an event loop, which will be the event loop of our main thread. In the example we use the libev library. The umem_mgr, udict_mgr and uref_mgr structures are used by Upipe modules to allocate memory chunks, which will serve as certain purposes.

File source  

We then start building the pipeline from source to sink, beginning with the file source module:

struct upipe_mgr *upipe_fsrc_mgr = upipe_fsrc_mgr_alloc();
upipe_source = upipe_void_alloc(upipe_fsrc_mgr,
uprobe_pfx_alloc(uprobe_output_alloc(uprobe_use(logger)),
loglevel, "fsrc"));
assert(upipe_source != NULL);
upipe_mgr_release(upipe_fsrc_mgr);
if (!ubase_check(upipe_set_uri(upipe_source, srcpath)))
exit(EXIT_FAILURE);

In Upipe, each object is attached to a "manager", which is used to allocate objects and perform commands on it. The manager is also useful when there is some kind of global context to maintain, such as the handle of a unique hardware resource. So the first line allocates the manager for file sources. The second line allocates a pipe object. upipe_void_alloc is the function used to allocate pipes which require no extra argument (such as specifying the output format of the pipe). Its first argument is the manager to use, and the second specifies a list of "probes" to attach to the pipe. Probes are used to handle events and log messages, and can be viewed as a hierarchy - lower-level probes will be called before higher-level probes. In this case, we allocate a uprobe_pfx probe, whose purpose is to prepend "[fsrc] " before every log message, and uprobe_output, which is used to communicate with the output pipe (more information on that later). The next probe after uprobe_output is the global probe "logger", on which we have to increment the refcount via uprobe_use, and whose purpose is to print messages to stderr (more information later).

After the pipe has been allocated, there is no need for the file source manager anymore, so we can release it. In Upipe, most structures are refcounted so the object is not actually freed - it is still in use by the upipe_source object. It will only be freed when upipe_source is released. But the upipe_fsrc_mgr pointer should no longer be accessed after this point.

The last command we send, upipe_set_uri, sends a control command to the file source pipe, asking it to open the file srcpath. It returns an error code that should be UBASE_ERR_NONE if the file was opened, or any other documented error.

TS demux  

At this point we have opened a file, and as it is supposed to be a TS file, we would like to allocate a demux.

struct upipe_mgr *upipe_ts_demux_mgr = upipe_ts_demux_mgr_alloc();
struct upipe_mgr *upipe_mpgvf_mgr = upipe_mpgvf_mgr_alloc();
upipe_ts_demux_mgr_set_mpgvf_mgr(upipe_ts_demux_mgr, upipe_mpgvf_mgr);
upipe_mgr_release(upipe_mpgvf_mgr);
struct upipe_mgr *upipe_h264f_mgr = upipe_h264f_mgr_alloc();
upipe_ts_demux_mgr_set_h264f_mgr(upipe_ts_demux_mgr, upipe_h264f_mgr);
upipe_mgr_release(upipe_h264f_mgr);
struct upipe *ts_demux = upipe_void_alloc_output(upipe_source, upipe_ts_demux_mgr,
uprobe_pfx_alloc(
uprobe_selflow_alloc(uprobe_use(logger),
uprobe_selflow_alloc(uprobe_use(logger), &uprobe_catch,
UPROBE_SELFLOW_PIC, "auto"),
UPROBE_SELFLOW_VOID, "auto"),
loglevel, "tsdemux"));
assert(ts_demux != NULL);
upipe_mgr_release(upipe_ts_demux_mgr);
upipe_release(ts_demux);

Again, we start by allocating the TS demux manager. Then we allocate other managers for framers (parsers for MPEG-2 and MPEG-4 AVC video formats), which are delivered in separate libraries, and we tell the TS demux manager about them. Then we allocate the TS demux as the output of our file source. When it is done we can release the ts_demux pointer as we won't need to access it again - this doesn't release the object, as it is still in use by the source pipe.

The TS demux is allocated with a collection of probes - uprobe_pfx (which was already discussed) and uprobe_selflow. We'll talk about this one later. There is no uprobe_output, as the ts_demux pipe itself doesn't have any output. Then we enter the process's main event loop:

ev_loop(loop, 0);

This is a libev instruction that starts watchers and triggers callbacks.

Elementary stream selection  

So what happens now? The file source pipe has allocated an idler pump, that is called immediately to read data from the file. The data is then fed to the TS demux, which starts by looking for a PAT (a structure containing the list of programs in the file). When a PAT is found, it emits a UPROBE_SPLIT_UPDATE event, and the list of programs can be read via upipe_split_iterate. We have a probe that does just that: iterate on the list of programs, and select the one we'd like to read. This is why we passed that to the TS demux:

uprobe_pfx_alloc(
uprobe_selflow_alloc(uprobe_use(logger), [...],
UPROBE_SELFLOW_VOID, "auto"),
loglevel, "tsdemux"));

UPROBE_SELFLOW_VOID indicates that we are looking for a program, "auto" means to select the first available program (otherwise you can specify a list of comma-separated program numbers). This probe, on the first detected program, allocates a "subpipe", an object with a struct upipe API which indicates our interest for the program. In turn, the subpipe looks for a PMT (a structure containing the list of elementary streams in the program), and emits UPROBE_SPLIT_UPDATE. Then another uprobe_selflow allows to select the first video elementary stream:

uprobe_selflow_alloc(uprobe_use(logger), &uprobe_catch,
UPROBE_SELFLOW_PIC, "auto"),

Decoding pipeline  

In all probes, the first argument is the next probe to call if the event wasn't caught. For selflow, the second argument is the probe to pass to subpipes when they are allocated. In that second case, we pass &uprobe_catch, an application-defined probe intialized earlier:

struct uprobe uprobe_catch;
uprobe_init(&uprobe_catch, split_catch, uprobe_use(logger));

Where split_catch is a callback:

static int split_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_NEW_FLOW_DEF)
return uprobe_throw_next(uprobe, upipe, event, args);

struct upipe *avcdec = upipe_void_alloc_output(upipe, upipe_avcdec_mgr,
uprobe_pfx_alloc_va(uprobe_output_alloc(uprobe_use(&uprobe_avcdec)),
loglevel, "avcdec"));
if (avcdec == NULL) {
upipe_err_va(upipe, "incompatible flow def");
upipe_release(upipe_source);
return UBASE_ERR_UNHANDLED;
}
upipe_release(avcdec);
return UBASE_ERR_NONE;
}

This probe only catches UPROBE_NEW_FLOW_DEF, which is thrown when the first picture out of the mpgv or h264 framer is sent. We allocate an avcodec decoder.

Video processing pipeline  

The avcodec decoder features a special probe, defined as followed:

static int avcdec_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_NEW_FLOW_DEF)
return uprobe_throw_next(uprobe, upipe, event, args);

struct uref *flow_def = va_arg(args, struct uref *);

uint64_t hsize, vsize, wanted_hsize;
struct urational sar;
bool progressive;
if (unlikely(!ubase_check(uref_pic_flow_get_hsize(flow_def, &hsize)) ||
!ubase_check(uref_pic_flow_get_vsize(flow_def, &vsize)) ||
!ubase_check(uref_pic_flow_get_sar(flow_def, &sar)))) {
upipe_err_va(upipe, "incompatible flow def");
upipe_release(upipe_source);
return UBASE_ERR_UNHANDLED;
}
wanted_hsize = (hsize * sar.num / sar.den / 2) * 2;
progressive = ubase_check(uref_pic_get_progressive(flow_def));

struct uref *flow_def2 = uref_dup(flow_def);
upipe_use(upipe);

if (!progressive) {
uref_pic_set_progressive(flow_def2);
struct upipe *deint = upipe_void_alloc_output(upipe,
upipe_filter_blend_mgr,
uprobe_pfx_alloc(uprobe_output_alloc(uprobe_use(logger)),
loglevel, "deint"));
assert(deint != NULL);
upipe_release(upipe);
upipe = deint;
}

if (wanted_hsize != hsize) {
uref_pic_flow_set_hsize(flow_def2, wanted_hsize);
struct upipe *sws = upipe_flow_alloc_output(upipe, upipe_sws_mgr,
uprobe_pfx_alloc_va(uprobe_output_alloc(uprobe_use(logger)),
loglevel, "sws"), flow_def2);
assert(sws != NULL);
upipe_release(upipe);
if (sws == NULL) {
upipe_err_va(upipe, "incompatible flow def");
uref_free(flow_def2);
upipe_release(upipe_source);
return true;
}
upipe = sws;
}

uref_pic_flow_clear_format(flow_def2);
uref_flow_set_def(flow_def2, "block.mjpeg.pic.");
struct upipe *jpegenc = upipe_flow_alloc_output(upipe, upipe_avcenc_mgr,
uprobe_pfx_alloc_va(uprobe_output_alloc(uprobe_use(logger)),
loglevel, "jpeg"), flow_def2);
assert(jpegenc != NULL);
upipe_release(upipe);
upipe_avcenc_set_option(jpegenc, "qmax", "2");
upipe = jpegenc;

struct upipe *urefprobe = upipe_void_alloc_output(upipe,
upipe_probe_uref_mgr,
uprobe_pfx_alloc_va(uprobe_output_alloc(uprobe_use(&uprobe_uref)),
loglevel, "urefprobe"));
assert(urefprobe != NULL);
upipe_release(upipe);
upipe = urefprobe;

struct upipe *fsink = upipe_void_alloc_output(upipe, upipe_fsink_mgr,
uprobe_pfx_alloc_va(uprobe_use(logger), loglevel),
"jpegsink"));
assert(fsink != NULL);
upipe_release(upipe);
upipe_fsink_set_path(fsink, dstpath, UPIPE_FSINK_OVERWRITE);
upipe = fsink;

uref_free(flow_def2);
upipe_release(upipe);
return UBASE_ERR_NONE;
}

This probe does the hard work. It allocates a deinterlacer, a scaler (on an as-needed basis), a JPEG encoder, an uref probe (more on this later) and a file sink. The pipes are released after allocation, so that a single call to upipe_release on the avcodec decoder releases the rest of the pipeline. Note how we use a second flow definition packet (uref2) and upipe_flow_alloc_output to specify the output format of the deinterlacer and scaler.

End of processing  

We said we wanted to output only the first frame, and quit afterwards. This magic is performed with the "urefprobe" pipe, allocated in the avcodec probe. This type of pipe throws a custom event (UPROBE_PROBE_UREF) for each incoming uref. Then we just need to write a specific probe handler as follows:

static int uref_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_PROBE_UREF)
return uprobe_throw_next(uprobe, upipe, event, args);

va_list args_copy;
va_copy(args_copy, args);
unsigned int signature = va_arg(args_copy, unsigned int);
va_end(args_copy);
if (signature != UPIPE_PROBE_UREF_SIGNATURE)
return uprobe_throw_next(uprobe, upipe, event, args);

if (upipe_source != NULL) {
/* release the source to exit */
upipe_release(upipe_source);
upipe_source = NULL;
/* send demux output to /dev/null */
upipe_set_output(upipe_split_output, upipe_null);
upipe_release(upipe_split_output);
upipe_split_output = NULL;
} else {
/* second (or after) frame, do not output them */
upipe_set_output(upipe, upipe_null);
}
return UBASE_ERR_NONE;
}

The UPROBE_PROBE_UREF event is a custom event; it is mandatory for custom events to send the pipe's signature as the first argument, to make sure that we do not mistake it with another custom event from another pipe. So the magic here is, as soon as we receive the first picture (first uref), we release the source. Another good idea is to send the output of the TS demux to a null pipe (a pipe that discards all input packets), so that we do not decode the following picture that could be output by the file source before it is finally deallocated (in Upipe, pipes treat entire uref's at a time, and are protected from deletion for the duration of the processing of the uref; if a uref contains several "frames", it is therefore possible for it to output several frames before being deallocated). The else clause is useful in case avcodec outputs several frames, which can happen if there is some reordering magic in the codec (B frames and stuff).

The release of the file source pipe kills its idler pump. When there is no pump anymore, libev returns from the ev_loop call, and the process exits. However please note that if the file sink is blocked (for instance writing to a blocking pipe), a pump will be allocated, and libev will only return when the file sink has finished its job and killed its pump.

When the event loop is over, you should release all pipes to which you have kept a pointer, clean all custom probe structures, and release all managers. The example shows how to do that.

Global probes  

There is one more thing we haven't talked about. All pipes we allocated have an argument (or a subsequently allocated probe) uprobe_use(logger). The logger object is allocated at the beginning of the program as follows:

logger = uprobe_stdio_alloc(NULL, logstream, loglevel);
assert(logger != NULL);
logger = uprobe_uref_mgr_alloc(logger, uref_mgr);
assert(logger != NULL);
logger = uprobe_upump_mgr_alloc(logger, upump_mgr);
assert(logger != NULL);
logger = uprobe_ubuf_mem_alloc(logger, umem_mgr, UBUF_POOL_DEPTH,
UBUF_POOL_DEPTH);
assert(logger != NULL);

So it is a collection of probes that are passed to all pipes. The uprobe_stdio probe catches all log messages and prints them to an stdio handle (typically stderr). uprobe_uref_mgr and uprobe_upump_mgr catch respectively UPROBE_NEED_UREF_MGR and UPROBE_NEED_UPUMP_MGR, which can be thrown by pipes which need to allocate urefs or upumps. Finally, uprobe_ubuf_mem catches the UPROBE_NEW_FLOW_FORMAT event, and allocates an ubuf manager (a structure which allocates data buffer) according to the given flow format (block, picture or sound).

The uprobe_use call is necessary because when passing a probe to a pipe (or a lower-level probe), it then owns the probe, and it wouldn't be possible to use the logger pointer anymore afterwards. The call increments the reference count of the logger object, so that logger can be used on other pipes as well. This implies that at the end of the process, logger must be released:

Output probes  

Finally, we need to explain how the magic of connecting pipes together works, and the role of the uprobe_output probe. Upipe has two kinds of control messages :

  • flow format: describes the low-level format of the buffer (block/picture/sound, number of planes, size of samples, etc.)

  • flow definition: extends the flow format with more metadata (language, codec global headers, fps, sample_rate, etc.)

A pipe may only be the output of another if both agree on the flow format and flow definition. When the input pipe determines a new flow format, it throws a UPROBE_NEW_FLOW_FORMAT event. uprobe_output catches this event, and calls upipe_amend_flow_format on the output, so the output has the chance to modify the padding or alignment of buffers. Then it forwards the event upstream (to a higher-level probe), which allocates a struct ubuf_mgr (such as uprobe_ubuf_mem), so the input can allocate buffers.

When the first buffer is ready to be output, the input pipe throws UPROBE_NEW_FLOW_DEF. uprobe_output catches the event, and calls upipe_set_flow_def on the output. If the output pipe returns an error, it is deallocated, and uprobe_output forwards the event upstream, so a new output pipe can be allocated (using upipe_void_alloc_output or upipe_flow_alloc_output).

So uprobe_output plays a major role in flow negociations, and should be allocated for each pipe, except sinks (which have no output). Some pipes, like the TS demux are technically a sink, because the output is handled by a subpipe for each elementary stream.

4.2 Pipe tutorial  

In this tutorial, we will write a pipe that accepts input packets, takes a number from an attribute, writes this number to a block ubuf, and outputs the new packet. Additionally, we will explore using a timer pump to trigger specific events.

The tutorial is heavily based on upipe_genaux.c.

Manager declaration  

In order to allow applications to allocate our pipe, we only need to provide a upipe_genaux_mgr_alloc function. The function returns a manager which will be used to allocate the pipes themselves.

static struct upipe_mgr upipe_genaux_mgr = {
.refcount = NULL,
.signature = UPIPE_GENAUX_SIGNATURE,

.upipe_alloc = upipe_genaux_alloc,
.upipe_input = upipe_genaux_input,
.upipe_control = upipe_genaux_control,

.upipe_mgr_control = NULL
};

struct upipe_mgr *upipe_genaux_mgr_alloc(void)
{
return &upipe_genaux_mgr;
}

Though the function is called _alloc, there is actually no allocation performed; the manager is a static structure, without refcounting. In some cases it may be interesting to allocate a dynamic structure, for instance when the allocation function takes an argument, or when the manager provides access to a shared resource - such as a PCI decoder or encoder card with a limited number of channels.

When declaring a manager, you must supply an allocation, an input, a control and a manager control functions. All functions, except upipe_alloc, may point to NULL. However, upipe_input may only be NULL if the pipe is a source; otherwise, calling upipe_input on such a pipe will assert().

Pipe declaration  

Here is how we declare our own private pipe structure:

struct upipe_genaux {
struct urefcount urefcount;

struct ubuf_mgr *ubuf_mgr;
struct upipe *output;
struct uref *flow_def;
bool flow_def_sent;

int (*getattr) (struct uref *, uint64_t *);

struct upipe upipe;
};

The public structure is struct upipe. It contains a pointer to the manager, a pointer to a refcount structure (which may be NULL if the object is not refcounted), a struct uchain object and an opaque for use by the application, and a pointer to the probe hierarchy.

The struct urefcount object is more or less mandatory in order to properly deallocate the structure when all references to it are released. Usually, upipe->refcount will point to this object, except when fancy things are needed (like separating the references count from the application from the references from internal objects such as subpipes, that need their super-pipe to outlive them). Other objects may also have a refcount that points to it (for instance if you embed a probe for inner pipes).

We will need to allocate buffers (struct ubuf_mgr), we will output packets (so we need an output struct upipe). The flow_def and flow_def_sent fields are used by the output helper, and contain respectively a pointer to the flow definition packet, and a flag indicating if it has already been thrown (using upipe_throw_new_flow_def) or if it needs to be thrown at the next output packet.

The only specific field is the getattr function pointer, that will serve to get the attribute from incoming packets, so we can write it in the outgoing packets.

Pipe helpers  

Standard structures have helpers that manage them in a standard way:

UPIPE_HELPER_UPIPE(upipe_genaux, upipe, UPIPE_GENAUX_SIGNATURE);
UPIPE_HELPER_UREFCOUNT(upipe_genaux, urefcount, upipe_genaux_free)
UPIPE_HELPER_VOID(upipe_genaux);
UPIPE_HELPER_UBUF_MGR(upipe_genaux, ubuf_mgr, flow_def);
UPIPE_HELPER_OUTPUT(upipe_genaux, output, flow_def, flow_def_sent);

The first helper declares the function upipe_genaux_to_upipe and upipe_genaux_from_upipe, which take advantage of container_of to pass from the substructure to the big structure, and vice versa. The second helper initializes the refcount so that the function upipe_genaux_free is called when the object is entirely released. The third helper declares functions to help writing upipe_genaux_alloc. The fourth and last ones declare functions to manage the ubuf manager and the output pipe.

Alloc and release  

The helpers need to be initialized and deinitialized:

static struct upipe *upipe_genaux_alloc(struct upipe_mgr *mgr,
struct uprobe *uprobe,
uint32_t signature, va_list args)
{
struct upipe *upipe = upipe_genaux_alloc_void(mgr, uprobe, signature, args);
if (unlikely(upipe == NULL))
return NULL;

struct upipe_genaux *upipe_genaux = upipe_genaux_from_upipe(upipe);
upipe_genaux_init_urefcount(upipe);
upipe_genaux_init_ubuf_mgr(upipe);
upipe_genaux_init_output(upipe);
upipe_genaux->getattr = uref_clock_get_cr_sys;
upipe_throw_ready(upipe);
return upipe;
}

static void upipe_genaux_free(struct upipe *upipe)
{
upipe_dbg_va(upipe, "releasing pipe %p", upipe);
upipe_throw_dead(upipe);

upipe_genaux_clean_ubuf_mgr(upipe);
upipe_genaux_clean_output(upipe);
upipe_genaux_clean_urefcount(upipe);
upipe_genaux_free_void(upipe);
}

upipe_genaux_alloc_void is declared by UPIPE_HELPER_VOID, allocates the big structure, and checks and uses the arguments (such as the probe hierarchy) passed to upipe_genaux_alloc. The other helpers are initialized, along with the only specific member (getattr). When the pipe is ready to accept control commands, it must throw the UPROBE_READY event.

On the contrary, when the pipe does not wish to receive control commands, it shall throw the UPROBE_DEAD event. After that point it is not allowed to throw other events (including log messages), so that is why our call to upipe_dbg_va is beforehands.

In Upipe, message logging is handled by functions upipe_err, upipe_warn, upipe_notice, upipe_dbg, and upipe_verbose, in the order of importance. Applications may elect to print messages that match a certain importance criterion. These functions only accept a plain string, but all have a _va counterpart (like many other functions in Upipe) which replaces the string with a printf-style string and additional arguments. Internally, message logging works by throwing a special UPROBE_LOG event, that is caught by the application or the standard struct uprobe_stdio probe.

upipe_genaux_free_void is declared by UPIPE_HELPER_VOID, releases the probes, and frees the big structure. Before that, the helpers must be deinitialized.

Input and output  

The input function is not mandatory, but in our case we process incoming buffers:

static void upipe_genaux_input(struct upipe *upipe, struct uref *uref,
struct upump **upump_p)
{
struct upipe_genaux *upipe_genaux = upipe_genaux_from_upipe(upipe);
uint64_t systime = 0;
int size;
struct ubuf *dst;
uint8_t *aux;

if (!ubase_check(upipe_genaux_check_ubuf_mgr(upipe)) ||
!ubase_check(upipe_genaux->getattr(uref, &systime))) {
uref_free(uref);
return;
}

size = sizeof(uint64_t);
dst = ubuf_block_alloc(upipe_genaux->ubuf_mgr, size);
if (unlikely(dst == NULL)) {
uref_free(uref);
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
return;
}
ubuf_block_write(dst, 0, &size, &aux);
upipe_genaux_hton64(aux, systime);
ubuf_block_unmap(dst, 0);
uref_attach_ubuf(uref, dst);
upipe_genaux_output(upipe, uref, upump_p);
}

upipe_genaux_check_ubuf_mgr is declared by UPIPE_HELPER_UBUF_MGR, and checks whether a ubuf manager has already been set, otherwise throws a UPROBE_NEW_FLOW_FORMAT event asking for a ubuf manager. The getattr function is supposed to have been previously set, and to return a uint64_t.

We allocate a struct ubuf with ubuf_block_alloc, of the size of a uint64_t. We call ubuf_block_write to get a pointer to data (ubuf_block_alloc is supposed to return a contiguous memory space so there is no need to check for returned size), hton64 to write the buffer in network endianness, and ubuf_block_unmap to invalidate the aux pointer. There must be a ubuf_block_unmap for each ubuf_block_write or ubuf_block_read.

uref_attach_ubuf releases the previous ubuf attached to the uref (containing the picture for instance), and reattaches the ubuf we have just allocated.

upipe_genaux_output is declared by UPIPE_HELPER_OUTPUT, and outputs the packet to the next pipe. If needed, it also does the magic with flow definitions.

Controls  

The control function is not mandatory, but if we want to handle flow definitions we need one:

static int upipe_genaux_control(struct upipe *upipe, int command, va_list args)
{
switch (command) {
case UPIPE_ATTACH_UBUF_MGR:
return upipe_genaux_attach_ubuf_mgr(upipe);

case UPIPE_AMEND_FLOW_FORMAT:
return UBASE_ERR_NONE;
case UPIPE_GET_FLOW_DEF: {
struct uref **p = va_arg(args, struct uref **);
return upipe_genaux_get_flow_def(upipe, p);
}
case UPIPE_SET_FLOW_DEF: {
struct uref *flow_def = va_arg(args, struct uref *);
return upipe_genaux_set_flow_def(upipe, flow_def);
}
case UPIPE_GET_OUTPUT: {
struct upipe **p = va_arg(args, struct upipe **);
return upipe_genaux_get_output(upipe, p);
}
case UPIPE_SET_OUTPUT: {
struct upipe *output = va_arg(args, struct upipe *);
return upipe_genaux_set_output(upipe, output);
}

case UPIPE_GENAUX_SET_GETATTR: {
UBASE_SIGNATURE_CHECK(args, UPIPE_GENAUX_SIGNATURE)
return _upipe_genaux_set_getattr(upipe,
va_arg(args, int (*)(struct uref*, uint64_t*)));
}
case UPIPE_GENAUX_GET_GETATTR: {
UBASE_SIGNATURE_CHECK(args, UPIPE_GENAUX_SIGNATURE)
return _upipe_genaux_get_getattr(upipe,
va_arg(args, int (**)(struct uref*, uint64_t*)));
}
default:
return UBASE_ERR_UNHANDLED;
}
}

This function is generally just a big switch/case where we handle control commands one by one. UPIPE_ATTACH_UBUF_MGR asks the pipe to release the ubuf manager and throw again a UPROBE_NEW_FLOW_FORMAT event, so that a new ubuf manager is allocated. This can be useful if the downstream topography has changed and a pipe has special requirements on alignment or prepending/appending. There are attach commands for ubuf managers, uref managers (useful for source pipes), upump managers (useful for pipes that interact with the event loop) and uclock. Some structures are optional and the behaviour of the pipe changes whether they are set or not (for instance uclock). upipe_genaux_attach_ubuf_mgr is provided by UPIPE_HELPER_UBUF_MGR.

UPIPE_AMEND_FLOW_FORMAT is called when the application needs to validate a new incoming flow format. With this control command you can typically change the flow format to improve the alignement, or add prepending/appending (picture and sound only). In our case we have no requirement on input ubufs.

UPIPE_GET_FLOW_DEF is supposed to return our output flow definition. UPIPE_HELPER_OUTPUT provides a generic implementation. Same goes for UPIPE_GET_OUTPUT and UPIPE_SET_OUTPUT.

UPIPE_SET_FLOW_DEF tells our pipe which input flow definition we must expect. It will be covered in the next chapter.

Finally, UPIPE_GENAUX_SET_GETATTR and UPIPE_GENAUX_GET_GETATTR show an example of local control commands. The first argument of such control commands must be the signature of the pipe, which must be checked with UBASE_SIGNATURE_CHECK.

Flow definition handling  

This function is almost mandatory in all pipes that accept an input:

static int upipe_genaux_set_flow_def(struct upipe *upipe, struct uref *flow_def)
{
if (flow_def == NULL)
return UBASE_ERR_INVALID;
struct uref *flow_def_dup;
if ((flow_def_dup = uref_dup(flow_def)) == NULL)
return UBASE_ERR_ALLOC;
if (unlikely(!ubase_check(uref_flow_set_def(flow_def_dup, "block.aux."))))
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
upipe_genaux_store_flow_def(upipe, flow_def_dup);
return UBASE_ERR_NONE;
}

It must return UBASE_ERR_NONE if the input flow definition is compatible with our pipe, and accepted; otherwise an error code is returned. UPIPE_SET_FLOW_DEF may be called at any point in the life of a pipe; the input flow definition may change at any moment. It is legal for a pipe to refuse to change a flow definition that would otherwise be compatible; in that case the application must deallocate/reallocate the same pipe type. It is mandatory to call UPIPE_SET_FLOW_DEF before inputting any packet.

We accept all kinds of input flow definition. However, at the same time, we allocate our output flow definition and change the definition attribute. upipe_genaux_store_flow_def is provided by UPIPE_HELPER_OUTPUT.

Using a pump  

The pipe we have written is pretty much finished (and the upipe_genaux.c file doesn't do anything else). However, for the purpose of the tutorial, let's suppose that we want to throw the UPROBE_SOURCE_END event after a given timeout. We need to work with a upump manager, which represents the event loop of the thread. Fortunately there is the UPIPE_HELPER_UPUMP_MGR that works in pretty much the same way as the UPIPE_HELPER_UBUF_MGR we've just seen. Then, as soon as we have a upump manager (an example can be seen in upipe_file_source.c), we allocate a timer:

struct upump *upump = upump_alloc_timer(upipe_genaux->upump_mgr,
upipe_genaux_timer, upipe,
5 * UCLOCK_FREQ, 0);
upump_start(upump);

The call-back upipe_genaux_timer will trigger after 5 seconds, and no longer afterwards; it may look like this:

static void upipe_genaux_timer(struct upump *upump)
{
struct upipe *upipe = upump_get_opaque(upump, struct upipe *);
upipe_throw_source_end(upipe);
upump_free(upump);
}

You may want to store pumps in your pipe structure, and Upipe provides UPIPE_HELPER_UPUMP for that. Pumps need to be freed before the pipe is deallocated, otherwise the opaque pointer of the pump points to an empty space.

Valid XHTML 1.0 StrictGenerated by cmassiot on Thu Nov 6 12:15:18 2014 using MkDoc