Dataflow.Signals can be used in multi-threaded environments. If you do so, it is recommended you use thread_safe_signals rather than Boost.Signals. thread_safe_signals can be installed as a drop-in replacement for Boost.Signals, but to use it with Dataflow.Signals, you currently need to use the following #defines before including any Dataflow.Signals or thread_safe_signals/ Boost.Signals headers:

#define signalslib signals
#define signals signals

This will cause thread_safe_signals to use the boost::signals namespace, rather than the boost::signalslib namespace it uses by default.

Provided components

Dataflow.Signals provides some components specifically intended for multi-threaded applications. These are:

  • mutex (provides mutexing locking on incoming signals)
  • condition (notifies a threading condition whenever a signal is received)
  • timed_generator (periodically generates signals in its own thread)

The documentation pages for each of these components provide more information.

Example of Creating a New Threadpool Component

Here is an example of developing a new component specifically intended for threading. The component makes use of the Threadpool library by Oliver Kowalke. We will call it async_component.

The purpose of the component is to make a signal call asynchronous. Instead of the consumer component processing the signal immediately, the signal call will be added to a thread pool and processed later. async_component is a class template, templated on the threadpool type, and the type of the underlying component that will process the asynchronous signal.

Let's begin with the use code that this component will allow us to use. The three examples provided are:

  • mixing synchronous and asynchronous signals
  • assigning priorities to tasks / signals
  • using asynchronous signals for cyclic networks
mixing synchronous and asynchronous signals

Our first example will be just a simple network that involves both synchronous and asynchronous filters. The filters will use a simple function that performs an increment:

// just an operation to work with
int inc_fn(int x)
{
    std::cout << "filter: " << x+1 << std::endl;
    return x+1;
}

Here is the simple example:

// our Threadpool type - simple FIFO thread pool.
typedef
    tp::pool<
        tp::fixed,
        tp::unbounded_channel< tp::fifo >
    > threadpool_type;

// we limit the number of threads to 5
threadpool_type pool(tp::max_poolsize(5));

// the component types:
// The source will store an int
typedef boost::signals::storage<void(int)> source_type;
// A filter will process the int
typedef boost::signals::function<void(int), int(int)> filter_type;
// This filter will process the int asynchronously
typedef async_component<threadpool_type, filter_type> async_filter_type;

// our components:
// We start with a 0
source_type source(0);

// an assortment of synchronous and asynchronous increase-filters
async_filter_type increase1(pool, inc_fn);
filter_type increase2(inc_fn);
filter_type increase3(inc_fn);
async_filter_type increase4(pool, inc_fn);
async_filter_type increase5(pool, inc_fn);

// our network
//  increase1 >>= increase2 >>= increase3 will be in its own thread
//  increase4 will be in its own thread
//  increase5 will be in its own thread
source
    | (increase1 >>= increase2 >>= increase3)
    | (increase4 >>= increase5);
// this was equivalent to:
// connect(source, increase1);
// connect(increase1, increase2);
// connect(increase2, increase3);
// connect(source, increase4);
// connect(increase4, increase5);

// submit the first task
submit(pool, source);

And a sample output of a run (output contains adding task whenever a new task has been added to the pool):

adding task
adding task
filter: 1
filter: 2
filter: 3
filter: 1
adding task
filter: 2
assigning priorities to tasks / signals

The second example involves a priority pool that allows us to specify the priority of a task. This example and the next will use the following filter component, which just prints some texts, waits a little, and forwards the signal. This example only uses the text printing capability of the component:

// a component that displays text, waits an optional period, and then forwards the signal
class printer : public boost::signals::filter<printer, void()>
{
public:
    printer(const std::string &text, unsigned wait_milliseconds=0)
        : m_text(text), m_wait_milliseconds(wait_milliseconds)
    {}
    void operator()()
    {
        std::cout << m_text << std::endl;
        boost::this_thread::sleep(boost::posix_time::milliseconds(m_wait_milliseconds));
        out();
    }
private:
    std::string m_text;
    unsigned m_wait_milliseconds;
};

The example uses the priorities to enforce an order of execution that prints a particular message:

// a pool that uses priorities for tasks
typedef 
    tp::pool<
        tp::fixed,
        tp::unbounded_channel< tp::priority< int > >
    > priority_threadpool_type;

// we limit the pool to one thread
priority_threadpool_type priority_pool(tp::max_poolsize(1));

// components:
typedef boost::signals::storage<void()> print_starter_type;
typedef async_component<priority_threadpool_type, printer> async_printer_type;

// this will start the execution
print_starter_type print_starter;
// this will print the text - we assign priorities so that the print
// order is "Hello","<>","World!","!":
async_printer_type print1(priority_pool, 3, "World");
async_printer_type print2(priority_pool, 2, "<>");
async_printer_type print3(priority_pool, 1, "Hello");
async_printer_type print4(priority_pool, 4, "!");

// our network:
print_starter
    | print1
    | print2
    | print3
    | print4;
// this was equivalent to:
// connect(print_starter, print1);
// connect(print_starter, print2);
// connect(print_starter, print3);
// connect(print_starter, print4);

    
// submit the first task
submit(priority_pool, 1, print_starter);

The output is:

adding task
adding task
adding task
adding task
Hello
<>
World
!
using asynchronous signals for cyclic networks

The final example demonstrates the use of a threadpool and asynchronous signals to allow cyclic networks. The following example induces a cycle consisting of ticker1, ticker2, and ticker3.

// our components:
typedef boost::signals::storage<void()> tick_starter_type;
typedef async_component<threadpool_type, printer> ticker_type;

// this will start the execution
tick_starter_type tick_starter;
// each component will print a line and wait a second before
// forwarding the signal
ticker_type
    ticker1(pool, "tick 1...", 1000),
    ticker2(pool, "tick 2...", 1000),
    ticker3(pool, "tick 3...", 1000);

// our network (note the ticker1, ticker2, ticker3 cycle)
tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
// this was equivalent to:
// connect(tick_starter, ticker1);
// connect(ticker1, ticker2);
// connect(ticker2, ticker3);
// connect(ticker3, ticker1);

// submit the first task
submit(pool, tick_starter);

// wait a little
boost::this_thread::sleep(boost::posix_time::seconds(5));

// shutdown the pool (no more new tasks will be processed)
pool.shutdown();

adding task
tick 1...
adding task
tick 2...
adding task
tick 3...
adding task
tick 1...
adding task
tick 2...
adding task
async_component implementation

Finally, for those interested, here is the implementation of the async_component class template. Since it operates on an underlying component, the implementation is not trivial. Nonetheless, the Dataflow.Signals building blocks still allow the implementation to be rather concise, considering that:

  • async_component can be used on a numer of underlying components, regardless of signal signature.
  • async_component works with both priority and non-priority thread pools.

// Copyright 2008 Stjepan Rajko.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include <iostream>
#include <cstdlib>
#include <stdexcept>

#include <boost/future/future.hpp>
#include <boost/fusion/include/fused.hpp>
#include <boost/fusion/include/join.hpp>
#include <boost/fusion/include/make_vector.hpp>

#define signalslib signals
#define signals signals

#include <boost/dataflow/signals/component/storage.hpp>
#include <boost/dataflow/signals/component/function.hpp>
#include <boost/dataflow/signals/connection/operators.hpp>
#include <boost/dataflow/utility/bind_mem_fn.hpp>
#include <boost/dataflow/utility/bind_functor.hpp>

#include <boost/tp/info.hpp>
#include "boost/tp/fifo.hpp"
#include "boost/tp/lazy.hpp"
#include "boost/tp/pool.hpp"
#include "boost/tp/poolsize.hpp"
#include "boost/tp/bounded_channel.hpp"
#include "boost/tp/priority.hpp"


namespace tp = boost::tp;

template<typename Threadpool, typename Component>
class async_component;

namespace detail {

    // async_component_impl_base will contain the implementation details
    // shared between the priority and non-priority versions    
    template<typename Threadpool, typename Component>
    class async_component_impl_base
    // filter_base is like filter, but it doesn't come with it's own signal.
    // instead, the derived class must provide a default_signal member function
    // that refers to the default signal.  We will use this to return the
    // default signal of the underlying component.
         : public boost::signals::filter_base<async_component<Threadpool, Component>, typename Component::signal_type >
    {
    public:
        typedef typename Component::signature_type signature_type;
        typedef typename Component::signal_type signal_type;
        typedef void result_type;
        
        // the constructors will just initialize the threadpool reference,
        // the underlying component, and m_component_function
        template<typename T0>
        async_component_impl_base(Threadpool &threadpool, const T0 &t0)
            : m_component(t0), m_threadpool(threadpool)
        {
            init_m_component_function();
        }

        template<typename T0, typename T1>
        async_component_impl_base(Threadpool &threadpool, const T0 &t0, const T1 &t1)
            : m_component(t0, t1), m_threadpool(threadpool)
        {
            init_m_component_function();
        }
        
        // with the following, anything that connects to the async_component's default
        // signal will actually connect to the default signal of the underlying
        // component
        typename Component::signal_type &default_signal()
        {
            namespace dataflow = boost::dataflow;
            return dataflow::get_default_port<
                    dataflow::args::left,
                    dataflow::signals::connect_mechanism, 
                    dataflow::signals::tag
                > (m_component);
        }

    protected:
        Component m_component;
        Threadpool &m_threadpool;
        boost::function<signature_type> m_component_function;
        
    private:
        // record the appropriate operator() overload of Component into m_component_function,
        // so we can submit it as a task later
        void init_m_component_function()
        {
            typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;

            m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
                (static_cast<mem_fn_type>(&Component::operator()), m_component);
        }
    };
    
    template<typename Threadpool, typename Component, typename Enable=void>
    class async_component_impl;
    
    // the non-priority version (for non-priority thread pools)
    template<typename Threadpool, typename Component>
    class async_component_impl<Threadpool, Component,
        typename boost::disable_if<tp::has_priority<Threadpool> >::type>
        : public async_component_impl_base<Threadpool, Component>
    {
    public:
        // forwarding constructors
        template<typename T0>
        async_component_impl(Threadpool &threadpool, const T0 &t0)
            : async_component_impl_base<Threadpool, Component>(threadpool, t0)
        {}

        template<typename T0, typename T1>
        async_component_impl(Threadpool &threadpool, const T0 &t0, const T1 &t1)
            : async_component_impl_base<Threadpool, Component>(threadpool, t0, t1)
        {}
        
        // when receiving the signal (which we get in a Fusion sequence)
        // we will submit the task
        template <class Seq>
        void operator()(const Seq &vec_par) const
        {
            // add the next function as a task in the pool
            std::cout << "adding task" << std::endl;
            // bind_functor is just a function object that calls bind
            // we create a fused version so we can call it with a fusion sequence
            boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
            
            // submit the task (the first parameter to bind is the function,
            // and the rest are the bound function arguments).
            boost::tp::task< void > t(
                async_component_impl::m_threadpool.submit(
                    fused_bind(
                        boost::fusion::join(
                            boost::fusion::make_vector(async_component_impl::m_component_function),
                            vec_par
                    )   )
                    ));
        }

    };

    template<typename Threadpool, typename Component>
    class async_component_impl<Threadpool, Component,
        typename boost::enable_if<tp::has_priority<Threadpool> >::type>
        : public async_component_impl_base<Threadpool, Component>
    {
        typedef typename tp::priority_type<Threadpool>::type priority_type;
    public:
        // constructor stores the priority, and forwards the rest
        template<typename T0>
        async_component_impl(Threadpool &threadpool, const priority_type &priority, const T0 &t0)
            : async_component_impl_base<Threadpool, Component>(threadpool, t0)
            , m_priority(priority)
        {}
        
        // when receiving the signal (which we get in a Fusion sequence)
        // we will submit the task with the stored priority.
        template <class Seq>
        void operator()(const Seq &vec_par) const
        {
            // add the next function as a task in the pool
            std::cout << "adding task" << std::endl;
            // bind_functor is just a function object that calls bind
            // we create a fused version so we can call it with a fusion sequence
            boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
            
            // submit the task (the first parameter to bind is the function,
            // and the rest are the bound function arguments).
            boost::tp::task< void > t(
                async_component_impl::m_threadpool.submit(
                    fused_bind(
                        boost::fusion::join(
                            boost::fusion::make_vector(async_component_impl::m_component_function),
                            vec_par
                    )   ),
                    m_priority
                    ));
        }
    private:
        priority_type m_priority;
    };

}


// our new async_component class - it will create a new task for the underlying
// component when it's operator() is called.
// unfused_inherited is an adaptor provided by Dataflow.Signals,
// which will allow the above implementation (which uses fused signals)
// to work with unfused signal signatures (fused / unfused in the Boost.Fusion
// sense)
template<typename Threadpool, typename Component>
class async_component : public boost::fusion::unfused_inherited<
    detail::async_component_impl<Threadpool,Component>,
    typename boost::function_types::parameter_types<typename Component::signature_type>::type >
{
    typedef boost::fusion::unfused_inherited<
        detail::async_component_impl<Threadpool,Component>,
        typename boost::function_types::parameter_types<typename Component::signature_type>::type>
        base_type;
public:
    // just forwarding constructors
    template<typename T0>
    async_component(Threadpool &threadpool, const T0 &t0)
        : base_type(threadpool, t0)
    {}
    template<typename T0, typename T1>
    async_component(Threadpool &threadpool, const T0 &t0, const T1 &t1)
        : base_type(threadpool, t0, t1)
    {}
};

// a function to submit the first task
template<typename Threadpool, typename Component>
void submit(Threadpool &threadpool, Component &component)
{
    tp::task< void > task(
        threadpool.submit(
            boost::bind(&Component::send, boost::ref(component))));
}

// a function to submit the first task with a priority
template<typename Threadpool, typename Component>
void submit(
    Threadpool &threadpool,
    const typename tp::priority_type<Threadpool>::type priority,
    Component &component)
{
    tp::task< void > task(
        threadpool.submit(
            boost::bind(&Component::send, boost::ref(component)), priority));
}