4 Tutorials
4.1 Application tutorial
In this tutorial, we will write an application that builds a pipeline featuring a file source, a TS demux, an avcodec decoder, an avcodec JPEG encoder, and a file sink. The goal of the application is to extract the first picture off of a TS file, and write it to the disk as a JPEG. The full source code is in the examples/extract_pic.c file.
Upipe environment
First we start with setting up the global structures required by most Upipe modules:
struct ev_loop *loop = ev_default_loop(0);
struct upump_mgr *upump_mgr = upump_ev_mgr_alloc(loop, UPUMP_POOL, UPUMP_BLOCKER_POOL);
struct umem_mgr *umem_mgr = umem_alloc_mgr_alloc();
struct udict_mgr *udict_mgr = udict_inline_mgr_alloc(UDICT_POOL_DEPTH, umem_mgr, -1, -1);
struct uref_mgr *uref_mgr = uref_std_mgr_alloc(UREF_POOL_DEPTH, udict_mgr, 0);
The first two lines establish an event loop, which will be the event loop of our main thread. In the example we use the libev library. The umem_mgr, udict_mgr and uref_mgr structures are used by Upipe modules to allocate memory chunks, which will serve as certain purposes.
File source
We then start building the pipeline from source to sink, beginning with the file source module:
struct upipe_mgr *upipe_fsrc_mgr = upipe_fsrc_mgr_alloc();
upipe_source = upipe_void_alloc(upipe_fsrc_mgr,
uprobe_pfx_alloc(uprobe_use(logger),
loglevel, "fsrc"));
assert(upipe_source != NULL);
upipe_mgr_release(upipe_fsrc_mgr);
if (!ubase_check(upipe_set_uri(upipe_source, srcpath)))
exit(EXIT_FAILURE);
In Upipe, each object is attached to a "manager", which is used to allocate objects and perform commands on it. The manager is also useful when there is some kind of global context to maintain, such as the handle of a unique hardware resource. So the first line allocates the manager for file sources. The second line allocates a pipe object. upipe_void_alloc is the function used to allocate pipes which require no extra argument (such as specifying the output format of the pipe). Its first argument is the manager to use, and the second specifies a list of "probes" to attach to the pipe. Probes are used to handle events and log messages, and can be viewed as a hierarchy - lower-level probes will be called before higher-level probes. In this case, we allocate a uprobe_pfx probe, whose purpose is to prepend "[fsrc] " before every log message. The next probe after uprobe_pfx is the global probe "logger", on which we have to increment the refcount via uprobe_use, and whose purpose is to print messages to stderr (more information later).
After the pipe has been allocated, there is no need for the file source manager anymore, so we can release it. In Upipe, most structures are refcounted so the object is not actually freed - it is still in use by the upipe_source object. It will only be freed when upipe_source is released. But the upipe_fsrc_mgr pointer should no longer be accessed after this point.
The last command we send, upipe_set_uri, sends a control command to the file source pipe, asking it to open the file srcpath. It returns an error code that should be UBASE_ERR_NONE if the file was opened, or any other documented error.
TS demux
At this point we have opened a file, and as it is supposed to be a TS file, we would like to allocate a demux.
struct upipe_mgr *upipe_ts_demux_mgr = upipe_ts_demux_mgr_alloc();
struct upipe_mgr *upipe_mpgvf_mgr = upipe_mpgvf_mgr_alloc();
upipe_ts_demux_mgr_set_mpgvf_mgr(upipe_ts_demux_mgr, upipe_mpgvf_mgr);
upipe_mgr_release(upipe_mpgvf_mgr);
struct upipe_mgr *upipe_h264f_mgr = upipe_h264f_mgr_alloc();
upipe_ts_demux_mgr_set_h264f_mgr(upipe_ts_demux_mgr, upipe_h264f_mgr);
upipe_mgr_release(upipe_h264f_mgr);
struct upipe *ts_demux = upipe_void_alloc_output(upipe_source, upipe_ts_demux_mgr,
uprobe_pfx_alloc(
uprobe_selflow_alloc(uprobe_use(logger),
uprobe_selflow_alloc(uprobe_use(logger), &uprobe_catch,
UPROBE_SELFLOW_PIC, "auto"),
UPROBE_SELFLOW_VOID, "auto"),
loglevel, "tsdemux"));
assert(ts_demux != NULL);
upipe_mgr_release(upipe_ts_demux_mgr);
upipe_release(ts_demux);
Again, we start by allocating the TS demux manager. Then we allocate other managers for framers (parsers for MPEG-2 and MPEG-4 AVC video formats), which are delivered in separate libraries, and we tell the TS demux manager about them. Then we allocate the TS demux as the output of our file source. When it is done we can release the ts_demux pointer as we won't need to access it again - this doesn't release the object, as it is still in use by the source pipe.
The TS demux is allocated with a collection of probes - uprobe_pfx (which was already discussed) and uprobe_selflow. We'll talk about this one later. Then we enter the process's main event loop:
ev_loop(loop, 0);
This is a libev instruction that starts watchers and triggers callbacks.
Elementary stream selection
So what happens now? The file source pipe has allocated an idler pump, that is called immediately to read data from the file. The data is then fed to the TS demux, which starts by looking for a PAT (a structure containing the list of programs in the file). When a PAT is found, it emits a UPROBE_SPLIT_UPDATE event, and the list of programs can be read via upipe_split_iterate. We have a probe that does just that: iterate on the list of programs, and select the one we'd like to read. This is why we passed that to the TS demux:
uprobe_pfx_alloc(
uprobe_selflow_alloc(uprobe_use(logger), [...],
UPROBE_SELFLOW_VOID, "auto"),
loglevel, "tsdemux"));
UPROBE_SELFLOW_VOID indicates that we are looking for a program, "auto" means to select the first available program (otherwise you can specify a list of comma-separated program numbers). This probe, on the first detected program, allocates a "subpipe", an object with a struct upipe API which indicates our interest for the program. In turn, the subpipe looks for a PMT (a structure containing the list of elementary streams in the program), and emits UPROBE_SPLIT_UPDATE. Then another uprobe_selflow allows to select the first video elementary stream:
uprobe_selflow_alloc(uprobe_use(logger), &uprobe_catch,
UPROBE_SELFLOW_PIC, "auto"),
Decoding pipeline
In all probes, the first argument is the next probe to call if the event wasn't caught. For selflow, the second argument is the probe to pass to subpipes when they are allocated. In that second case, we pass &uprobe_catch, an application-defined probe initialized earlier:
struct uprobe uprobe_catch;
uprobe_init(&uprobe_catch, split_catch, uprobe_use(logger));
Where split_catch is a callback:
static int split_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_NEED_OUTPUT)
return uprobe_throw_next(uprobe, upipe, event, args);
struct upipe *avcdec = upipe_void_alloc_output(upipe, upipe_avcdec_mgr,
uprobe_pfx_alloc_va(uprobe_use(&uprobe_avcdec),
loglevel, "avcdec"));
if (avcdec == NULL) {
upipe_err_va(upipe, "incompatible flow def");
upipe_release(upipe_source);
return UBASE_ERR_UNHANDLED;
}
upipe_release(avcdec);
return UBASE_ERR_NONE;
}
This probe only catches UPROBE_NEED_OUTPUT, which is thrown when the first picture out of the mpgv or h264 framer is sent. We allocate an avcodec decoder.
Video processing pipeline
The avcodec decoder features a special probe, defined as followed:
static int avcdec_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_NEED_OUTPUT)
return uprobe_throw_next(uprobe, upipe, event, args);
struct uref *flow_def = va_arg(args, struct uref *);
uint64_t hsize, vsize, wanted_hsize;
struct urational sar;
bool progressive;
if (unlikely(!ubase_check(uref_pic_flow_get_hsize(flow_def, &hsize)) ||
!ubase_check(uref_pic_flow_get_vsize(flow_def, &vsize)) ||
!ubase_check(uref_pic_flow_get_sar(flow_def, &sar)))) {
upipe_err_va(upipe, "incompatible flow def");
upipe_release(upipe_source);
return UBASE_ERR_UNHANDLED;
}
wanted_hsize = (hsize * sar.num / sar.den / 2) * 2;
progressive = ubase_check(uref_pic_get_progressive(flow_def));
struct uref *flow_def2 = uref_dup(flow_def);
upipe_use(upipe);
if (!progressive) {
uref_pic_set_progressive(flow_def2);
struct upipe *deint = upipe_void_alloc_output(upipe,
upipe_filter_blend_mgr,
uprobe_pfx_alloc(uprobe_use(logger),
loglevel, "deint"));
assert(deint != NULL);
upipe_release(upipe);
upipe = deint;
}
if (wanted_hsize != hsize) {
uref_pic_flow_set_hsize(flow_def2, wanted_hsize);
struct upipe *sws = upipe_flow_alloc_output(upipe, upipe_sws_mgr,
uprobe_pfx_alloc_va(uprobe_use(logger),
loglevel, "sws"), flow_def2);
assert(sws != NULL);
upipe_release(upipe);
if (sws == NULL) {
upipe_err_va(upipe, "incompatible flow def");
uref_free(flow_def2);
upipe_release(upipe_source);
return true;
}
upipe = sws;
}
uref_pic_flow_clear_format(flow_def2);
uref_flow_set_def(flow_def2, "block.mjpeg.pic.");
struct upipe *jpegenc = upipe_flow_alloc_output(upipe, upipe_avcenc_mgr,
uprobe_pfx_alloc_va(uprobe_use(logger),
loglevel, "jpeg"), flow_def2);
assert(jpegenc != NULL);
upipe_release(upipe);
upipe_avcenc_set_option(jpegenc, "qmax", "2");
upipe = jpegenc;
struct upipe *urefprobe = upipe_void_alloc_output(upipe,
upipe_probe_uref_mgr,
uprobe_pfx_alloc_va(uprobe_use(&uprobe_uref),
loglevel, "urefprobe"));
assert(urefprobe != NULL);
upipe_release(upipe);
upipe = urefprobe;
struct upipe *fsink = upipe_void_alloc_output(upipe, upipe_fsink_mgr,
uprobe_pfx_alloc_va(uprobe_use(logger), loglevel),
"jpegsink"));
assert(fsink != NULL);
upipe_release(upipe);
upipe_fsink_set_path(fsink, dstpath, UPIPE_FSINK_OVERWRITE);
upipe = fsink;
uref_free(flow_def2);
upipe_release(upipe);
return UBASE_ERR_NONE;
}
This probe does the hard work. It allocates a deinterlacer, a scaler (on an as-needed basis), a JPEG encoder, an uref probe (more on this later) and a file sink. The pipes are released after allocation, so that a single call to upipe_release on the avcodec decoder releases the rest of the pipeline. Note how we use a second flow definition packet (uref2) and upipe_flow_alloc_output to specify the output format of the deinterlacer and scaler.
End of processing
We said we wanted to output only the first frame, and quit afterwards. This magic is performed with the "urefprobe" pipe, allocated in the avcodec probe. This type of pipe throws a custom event (UPROBE_PROBE_UREF) for each incoming uref. Then we just need to write a specific probe handler as follows:
static int uref_catch(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args)
{
if (event != UPROBE_PROBE_UREF)
return uprobe_throw_next(uprobe, upipe, event, args);
va_list args_copy;
va_copy(args_copy, args);
unsigned int signature = va_arg(args_copy, unsigned int);
va_end(args_copy);
if (signature != UPIPE_PROBE_UREF_SIGNATURE)
return uprobe_throw_next(uprobe, upipe, event, args);
if (upipe_source != NULL) {
/* release the source to exit */
upipe_release(upipe_source);
upipe_source = NULL;
/* send demux output to /dev/null */
upipe_set_output(upipe_split_output, upipe_null);
upipe_release(upipe_split_output);
upipe_split_output = NULL;
} else {
/* second (or after) frame, do not output them */
upipe_set_output(upipe, upipe_null);
}
return UBASE_ERR_NONE;
}
The UPROBE_PROBE_UREF event is a custom event; it is mandatory for custom events to send the pipe's signature as the first argument, to make sure that we do not mistake it with another custom event from another pipe. So the magic here is, as soon as we receive the first picture (first uref), we release the source. Another good idea is to send the output of the TS demux to a null pipe (a pipe that discards all input packets), so that we do not decode the following picture that could be output by the file source before it is finally deallocated (in Upipe, pipes treat entire uref's at a time, and are protected from deletion for the duration of the processing of the uref; if a uref contains several "frames", it is therefore possible for it to output several frames before being deallocated). The else clause is useful in case avcodec outputs several frames, which can happen if there is some reordering magic in the codec (B frames and stuff).
The release of the file source pipe kills its idler pump. When there is no pump anymore, libev returns from the ev_loop call, and the process exits. However please note that if the file sink is blocked (for instance writing to a blocking pipe), a pump will be allocated, and libev will only return when the file sink has finished its job and killed its pump.
When the event loop is over, you should release all pipes to which you have kept a pointer, clean all custom probe structures, and release all managers. The example shows how to do that.
Global probes
There is one more thing we haven't talked about. All pipes we allocated have an argument (or a subsequently allocated probe) uprobe_use(logger). The logger object is allocated at the beginning of the program as follows:
logger = uprobe_stdio_alloc(NULL, logstream, loglevel);
assert(logger != NULL);
logger = uprobe_uref_mgr_alloc(logger, uref_mgr);
assert(logger != NULL);
logger = uprobe_upump_mgr_alloc(logger, upump_mgr);
assert(logger != NULL);
logger = uprobe_ubuf_mem_alloc(logger, umem_mgr, UBUF_POOL_DEPTH,
UBUF_POOL_DEPTH);
assert(logger != NULL);
So it is a collection of probes that are passed to all pipes. The uprobe_stdio probe catches all log messages and prints them to an stdio handle (typically stderr). The other probes catch UPROBE_PROVIDE_REQUEST events, which require data structures.
The uprobe_use call is necessary because when passing a probe to a pipe (or a lower-level probe), it then owns the probe, and it wouldn't be possible to use the logger pointer anymore afterwards. The call increments the reference count of the logger object, so that logger can be used on other pipes as well. This implies that at the end of the process, logger must be released:
uprobe_release(logger);
4.2 Pipe tutorial
In this tutorial, we will write a pipe that accepts input packets, takes a number from an attribute, writes this number to a block ubuf, and outputs the new packet. Additionally, we will explore using a timer pump to trigger specific events.
The tutorial is heavily based on upipe_genaux.c.
Manager declaration
In order to allow applications to allocate our pipe, we only need to provide a upipe_genaux_mgr_alloc function. The function returns a manager which will be used to allocate the pipes themselves.
static struct upipe_mgr upipe_genaux_mgr = {
.refcount = NULL,
.signature = UPIPE_GENAUX_SIGNATURE,
.upipe_alloc = upipe_genaux_alloc,
.upipe_input = upipe_genaux_input,
.upipe_control = upipe_genaux_control,
.upipe_mgr_control = NULL
};
struct upipe_mgr *upipe_genaux_mgr_alloc(void)
{
return &upipe_genaux_mgr;
}
Though the function is called _alloc, there is actually no allocation performed; the manager is a static structure, without refcounting. In some cases it may be interesting to allocate a dynamic structure, for instance when the allocation function takes an argument, or when the manager provides access to a shared resource - such as a PCI decoder or encoder card with a limited number of channels.
When declaring a manager, you must supply an allocation, an input, a control and a manager control functions. All functions, except upipe_alloc, may point to NULL. However, upipe_input may only be NULL if the pipe is a source; otherwise, calling upipe_input on such a pipe will assert().
Pipe declaration
Here is how we declare our own private pipe structure:
struct upipe_genaux {
struct urefcount urefcount;
struct ubuf_mgr *ubuf_mgr;
struct urequest ubuf_mgr_request;
struct upipe *output;
struct uref *flow_def;
enum upipe_helper_output_state output_state;
struct uchain request_list;
struct uchain urefs;
unsigned int nb_urefs;
unsigned int max_urefs;
struct uchain blockers;
struct upipe upipe;
};
The public structure is struct upipe. It contains a pointer to the manager, a pointer to a refcount structure (which may be NULL if the object is not refcounted), a struct uchain object and an opaque for use by the application, and a pointer to the probe hierarchy.
The struct urefcount object is more or less mandatory in order to properly deallocate the structure when all references to it are released. Usually, upipe->refcount will point to this object, except when fancy things are needed (like separating the references count from the application from the references from internal objects such as subpipes, that need their super-pipe to outlive them). Other objects may also have a refcount that points to it (for instance if you embed a probe for inner pipes).
We will need to allocate buffers (struct ubuf_mgr), and we will output packets (so we need an output struct upipe). The ubuf_mgr_request field is used by the ubuf_mgr helper, and the flow_def, output_state and request_list fields are used by the output helper. The urefs, nb_urefs, max_urefs and blockers fields are used by the sink helper, which buffers incoming packets. We are going to need to buffer packets between the moment we ask for a ubuf manager, and the moment it is provided to us (the internal events are asynchronous).
The only specific field is the getattr function pointer, that will serve to get the attribute from incoming packets, so we can write it in the outgoing packets.
Pipe helpers
Standard structures have helpers that manage them in a standard way:
UPIPE_HELPER_UPIPE(upipe_genaux, upipe, UPIPE_GENAUX_SIGNATURE);
UPIPE_HELPER_UREFCOUNT(upipe_genaux, urefcount, upipe_genaux_free)
UPIPE_HELPER_VOID(upipe_genaux);
UPIPE_HELPER_OUTPUT(upipe_genaux, output, flow_def, output_state, request_list);
UPIPE_HELPER_UBUF_MGR(upipe_genaux, ubuf_mgr, ubuf_mgr_request,
upipe_genaux_check,
upipe_genaux_register_output_request,
upipe_genaux_unregister_output_request)
UPIPE_HELPER_INPUT(upipe_genaux, urefs, nb_urefs, max_urefs, blockers, upipe_genaux_handle)
The first helper declares the function upipe_genaux_to_upipe and upipe_genaux_from_upipe, which take advantage of container_of to pass from the substructure to the big structure, and vice versa. The second helper initializes the refcount so that the function upipe_genaux_free is called when the object is entirely released. The third helper declares functions to help writing upipe_genaux_alloc. The last ones declare functions to manage the output pipe, the ubuf manager, and the buffering of packets.
Alloc and release
The helpers need to be initialized and deinitialized:
static struct upipe *upipe_genaux_alloc(struct upipe_mgr *mgr,
struct uprobe *uprobe,
uint32_t signature, va_list args)
{
struct upipe *upipe = upipe_genaux_alloc_void(mgr, uprobe, signature, args);
if (unlikely(upipe == NULL))
return NULL;
struct upipe_genaux *upipe_genaux = upipe_genaux_from_upipe(upipe);
upipe_genaux_init_urefcount(upipe);
upipe_genaux_init_ubuf_mgr(upipe);
upipe_genaux_init_output(upipe);
upipe_genaux_init_sink(upipe);
upipe_genaux->getattr = uref_clock_get_cr_sys;
upipe_throw_ready(upipe);
return upipe;
}
static void upipe_genaux_free(struct upipe *upipe)
{
upipe_dbg_va(upipe, "releasing pipe %p", upipe);
upipe_throw_dead(upipe);
upipe_genaux_clean_sink(upipe);
upipe_genaux_clean_ubuf_mgr(upipe);
upipe_genaux_clean_output(upipe);
upipe_genaux_clean_urefcount(upipe);
upipe_genaux_free_void(upipe);
}
upipe_genaux_alloc_void is declared by UPIPE_HELPER_VOID, allocates the big structure, and checks and uses the arguments (such as the probe hierarchy) passed to upipe_genaux_alloc. The other helpers are initialized, along with the only specific member (getattr). When the pipe is ready to accept control commands, it must throw the UPROBE_READY event.
On the contrary, when the pipe does not wish to receive control commands, it shall throw the UPROBE_DEAD event. After that point it is not allowed to throw other events (including log messages), so that is why our call to upipe_dbg_va is beforehands.
In Upipe, message logging is handled by functions upipe_err, upipe_warn, upipe_notice, upipe_dbg, and upipe_verbose, in the order of importance. Applications may elect to print messages that match a certain importance criterion. These functions only accept a plain string, but all have a _va counterpart (like many other functions in Upipe) which replaces the string with a printf-style string and additional arguments. Internally, message logging works by throwing a special UPROBE_LOG event, that is caught by the application or the standard struct uprobe_stdio probe.
upipe_genaux_free_void is declared by UPIPE_HELPER_VOID, releases the probes, and frees the big structure. Before that, the helpers must be deinitialized.
Controls
The control function is not mandatory, but if we want to handle flow definitions we need one:
static int upipe_genaux_control(struct upipe *upipe, int command, va_list args)
{
switch (command) {
case UPIPE_REGISTER_REQUEST: {
struct urequest *request = va_arg(args, struct urequest *);
if (request->type == UREQUEST_UBUF_MGR ||
request->type == UREQUEST_FLOW_FORMAT)
return upipe_throw_provide_request(upipe, request);
return upipe_genaux_alloc_output_proxy(upipe, request);
}
case UPIPE_UNREGISTER_REQUEST: {
struct urequest *request = va_arg(args, struct urequest *);
if (request->type == UREQUEST_UBUF_MGR ||
request->type == UREQUEST_FLOW_FORMAT)
return UBASE_ERR_NONE;
return upipe_genaux_free_output_proxy(upipe, request);
}
case UPIPE_GET_FLOW_DEF: {
struct uref **p = va_arg(args, struct uref **);
return upipe_genaux_get_flow_def(upipe, p);
}
case UPIPE_SET_FLOW_DEF: {
struct uref *flow_def = va_arg(args, struct uref *);
return upipe_genaux_set_flow_def(upipe, flow_def);
}
case UPIPE_GET_OUTPUT: {
struct upipe **p = va_arg(args, struct upipe **);
return upipe_genaux_get_output(upipe, p);
}
case UPIPE_SET_OUTPUT: {
struct upipe *output = va_arg(args, struct upipe *);
return upipe_genaux_set_output(upipe, output);
}
case UPIPE_GENAUX_SET_GETATTR: {
UBASE_SIGNATURE_CHECK(args, UPIPE_GENAUX_SIGNATURE)
return _upipe_genaux_set_getattr(upipe,
va_arg(args, int (*)(struct uref*, uint64_t*)));
}
case UPIPE_GENAUX_GET_GETATTR: {
UBASE_SIGNATURE_CHECK(args, UPIPE_GENAUX_SIGNATURE)
return _upipe_genaux_get_getattr(upipe,
va_arg(args, int (**)(struct uref*, uint64_t*)));
}
default:
return UBASE_ERR_UNHANDLED;
}
}
This function is generally just a big switch/case where we handle control commands one by one. UPIPE_REGISTER_REQUEST and UPIPE_UNREGISTER_REQUEST handle incoming requests for data structures. We use functions from the UPIPE_HELPER_OUTPUT to proxy the uref_mgr and uclock requests downstream (upipe_genaux_alloc_output_proxy). For ubuf manager and flow format, there is no need to forward downstream, as we will allocate new ubufs. We ask the application to provide the request with upipe_throw_provide_request.
UPIPE_GET_FLOW_DEF is supposed to return our output flow definition. UPIPE_HELPER_OUTPUT provides a generic implementation. Same goes for UPIPE_GET_OUTPUT and UPIPE_SET_OUTPUT.
UPIPE_SET_FLOW_DEF tells our pipe which input flow definition we must expect. It will be covered in the next chapter.
Finally, UPIPE_GENAUX_SET_GETATTR and UPIPE_GENAUX_GET_GETATTR show an example of local control commands. The first argument of such control commands must be the signature of the pipe, which must be checked with UBASE_SIGNATURE_CHECK.
Flow definition handling
This function is almost mandatory in all pipes that accept an input:
static int upipe_genaux_set_flow_def(struct upipe *upipe, struct uref *flow_def)
{
if (flow_def == NULL)
return UBASE_ERR_INVALID;
struct uref *flow_def_dup;
if ((flow_def_dup = uref_dup(flow_def)) == NULL)
return UBASE_ERR_ALLOC;
if (unlikely(!ubase_check(uref_flow_set_def(flow_def_dup, "block.aux."))))
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
upipe_input(upipe, flow_def_dup, NULL);
return UBASE_ERR_NONE;
}
It must return UBASE_ERR_NONE if the input flow definition is compatible with our pipe, and accepted; otherwise an error code is returned. UPIPE_SET_FLOW_DEF may be called at any point in the life of a pipe; the input flow definition may change at any moment. It is legal for a pipe to refuse to change a flow definition that would otherwise be compatible; in that case the application must deallocate/reallocate the same pipe type. It is mandatory to call UPIPE_SET_FLOW_DEF before inputting any packet.
We accept all kinds of input flow definition. We duplicate the flow definition packet (all structures passed to upipe_control belong to the caller) and send it to our input function, so that if packets have been buffered, the new flow definition will only take precedence once the buffer is emptied.
Data handling
The incoming packets are processed by the upipe_genaux_handle function:
static bool upipe_genaux_handle(struct upipe *upipe, struct uref *uref,
struct upump **upump_p)
{
struct upipe_genaux *upipe_genaux = upipe_genaux_from_upipe(upipe);
const char *def;
if (unlikely(ubase_check(uref_flow_get_def(uref, &def)))) {
upipe_genaux_store_flow_def(upipe, NULL);
upipe_genaux_require_ubuf_mgr(upipe, uref);
return true;
}
if (upipe_genaux->flow_def == NULL)
return false;
uint64_t systime = 0;
int size;
struct ubuf *dst;
uint8_t *aux;
if (!ubase_check(upipe_genaux->getattr(uref, &systime))) {
uref_free(uref);
return true;
}
size = sizeof(uint64_t);
dst = ubuf_block_alloc(upipe_genaux->ubuf_mgr, size);
if (unlikely(dst == NULL)) {
uref_free(uref);
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
return true;
}
ubuf_block_write(dst, 0, &size, &aux);
upipe_genaux_hton64(aux, systime);
ubuf_block_unmap(dst, 0);
uref_attach_ubuf(uref, dst);
upipe_genaux_output(upipe, uref, upump_p);
return true;
}
The first part checks if the packet is a new input flow definition (see previous section). The old flow definition is released using upipe_genaux_store_flow_def, and a new ubuf manager is requested with upipe_genaux_require_ubuf_mgr. Both functions are provided by the UPIPE_HELPER_OUTPUT,
If the packet is a standard packet and no ubuf manager has been received (flow_def == NULL), the packet is buffered by returning false. Otherwise, the packet is processed as follows:
We allocate a struct ubuf with ubuf_block_alloc, of the size of a uint64_t.
We call ubuf_block_write to get a pointer to data (ubuf_block_alloc is supposed to return a contiguous memory space so there is no need to check for returned size), hton64 to write the buffer in network endianness, and ubuf_block_unmap to invalidate the aux pointer. There must be a ubuf_block_unmap for each ubuf_block_write or ubuf_block_read. The getattr function is supposed to have been previously set, and to return a uint64_t.
The new ubuf is attached to the uref using uref_attach_ubuf, and the previous ubuf is automatically released.
The uref is output to the next pipe with upipe_genaux_output, declared by UPIPE_HELPER_OUTPUT. If needed, it also does the magic with flow definitions.
The input function calls upipe_genaux_handle (second case), or directly buffers the incoming urefs if the buffer is not empty (first case):
static void upipe_genaux_input(struct upipe *upipe, struct uref *uref,
struct upump **upump_p)
{
if (!upipe_genaux_check_input(upipe)) {
upipe_genaux_hold_input(upipe, uref);
upipe_genaux_block_input(upipe, upump_p);
} else if (!upipe_genaux_handle(upipe, uref, upump_p)) {
upipe_genaux_hold_input(upipe, uref);
upipe_genaux_block_input(upipe, upump_p);
/* Increment upipe refcount to avoid disappearing before all packets
* have been sent. */
upipe_use(upipe);
}
}
If this is the first held packet (upipe_genaux_handle was called and returned false), we increment the reference count on the pipe so that it cannot be released until all buffered packets have been output.
The output helper also requires a function that will be called when the ubuf manager is provided, and to which the amended flow definition is passed (for instance, downstream pipes could require a different alignment, or margins in a picture).
static int upipe_genaux_check(struct upipe *upipe, struct uref *flow_format)
{
struct upipe_genaux *upipe_genaux = upipe_genaux_from_upipe(upipe);
if (flow_format != NULL)
upipe_genaux_store_flow_def(upipe, flow_format);
if (upipe_genaux->flow_def == NULL)
return UBASE_ERR_NONE;
bool was_buffered = !upipe_genaux_check_input(upipe);
upipe_genaux_output_input(upipe);
upipe_genaux_unblock_input(upipe);
if (was_buffered && upipe_genaux_check_input(upipe)) {
/* All packets have been output, release again the pipe that has been
* used in @ref upipe_genaux_input. */
upipe_release(upipe);
}
return UBASE_ERR_NONE;
}
It our case, we store the new flow definition, which will allow upipe_genaux_handle to run. We handle buffered packets with upipe_genaux_output_sink, unblock potentially blocked pumps with upipe_genaux_unblock_sink, and decrement the reference count that was incremented in upipe_genaux_input when the input was blocked. Both functions are provided by the UPIPE_HELPER_INPUT.
Using a pump
The pipe we have written is pretty much finished (and the upipe_genaux.c file doesn't do anything else). However, for the purpose of the tutorial, let's suppose that we want to throw the UPROBE_SOURCE_END event after a given timeout. We need to work with a upump manager, which represents the event loop of the thread. Fortunately there is the UPIPE_HELPER_UPUMP_MGR that will allow us to receive a upump manager using upipe_genaux_check_upump_mgr. Then, as soon as we have a upump manager (an example can be seen in upipe_file_source.c), we allocate a timer:
struct upump *upump = upump_alloc_timer(upipe_genaux->upump_mgr,
upipe_genaux_timer, upipe,
5 * UCLOCK_FREQ, 0);
upump_start(upump);
The call-back upipe_genaux_timer will trigger after 5 seconds, and no longer afterwards; it may look like this:
static void upipe_genaux_timer(struct upump *upump)
{
struct upipe *upipe = upump_get_opaque(upump, struct upipe *);
upipe_throw_source_end(upipe);
upump_free(upump);
}
You may want to store pumps in your pipe structure, and Upipe provides UPIPE_HELPER_UPUMP for that. Pumps need to be freed before the pipe is deallocated, otherwise the opaque pointer of the pump points to an empty space.
If your pipe has a upump manager, it should handle the UPIPE_ATTACH_UPUMP_MGR control command, which will tell it when to require a upump manager. The same goes with uclock, which is optional is many pipes.