This example shows how you can take advantage of dataflow programming to create distributed applications. It works as long as the data passed in the signal is serializable using Boost.Serialization.
Dataflow.Signals provides two
components which can be used to create a producer-consumer connection
between two computers: socket_sender
and socket_receiver.
As long as we have a network socket set up between two computers, we
can do the following to set up a connection between a SignalProducer
on one computer with a SignalConsumer
on another:
-
On the
SignalProducer's computer, construct a socket_sender of the appropriateSignaturewith the given socket. connect theSignalProducerto the socket_sender. -
On the
SignalConsumer's computer, construct a socket_receiver of the appropriateSignaturewith the given socket. connect the socket_receiver to theSignalConsumer.
That's it. Now, every signal sent out of the SignalProducer
should be received by the SignalConsumer.
The following is a modification of the example from the motivation section to a dataflow network that straddles a network socket:
#include <boost/dataflow/signals/component/socket_receiver.hpp> #include <boost/dataflow/signals/component/socket_sender.hpp> #include <boost/dataflow/signals/component/storage.hpp> #include <boost/dataflow/signals/component/timed_generator.hpp> #include <boost/dataflow/signals/connection.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include "simple_example_components.hpp" using namespace boost; mutex mutex_; condition cond; bool server_thread_ready = false; asio::io_service io_service; // This function will set up an asio acceptor, and wait for a connection. // Once established, it will set up a signal network which will send // its final signal through the socket. void asio_server() { // set up the socket asio::ip::tcp::acceptor acceptor(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 1097)); asio::ip::tcp::socket socket(io_service); { boost::mutex::scoped_lock lock(mutex_); acceptor.listen(); server_thread_ready = true; cond.notify_all(); } acceptor.accept(socket); // For our data source, we will use timed_generator, // which creates its own thread and outputs it's stored value // at a specified time interval. We'll store a value of 0 to be sent out. // The signature void(double) specifies that the signal carries a double, // and that there is no return value. signals::timed_generator<void (double)> input(0); // To have our dataflow network straddle a network connection, // we need a socket_sender signals::socket_sender<void (double)> sender(socket); // ---Connect the dataflow network --------------------- // // ,---------. ,---------. // | input | --> | sender --- -- - (socket) // `---------' `---------' // // ----------------------------------------------------- input >>= sender; // Tell the source to start producing data, every 0.5s: input.enable(0.5); // take a little nap :-) boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += 10; boost::thread::sleep(xt); input.join(); } int main(int, char* []) { // start the server in a separate thread, and wait until it is listening boost::thread t(asio_server); { boost::mutex::scoped_lock lock(mutex_); while(!server_thread_ready) cond.wait(lock); } // set up the socket asio::ip::tcp::endpoint endpoint_recv(asio::ip::address::from_string("127.0.0.1"), 1097); asio::ip::tcp::socket socket(io_service); socket.connect(endpoint_recv); // Setup data processor and output: processor proc; output out; signals::socket_receiver<void (double)> receiver(socket); // ---Connect the dataflow network ----------------------------- // // ,---------. ,---------. ,---------. // (socket) - -- --- receiver | --> | proc | --> | out | // `---------' `---------' `---------' // // ------------------------------------------------------------- receiver >>= proc >>= out; // start asynchronous receiving receiver.async_read(); boost::thread receive_thread(boost::bind(&asio::io_service::run, boost::ref(io_service))); // and wait until the server is done sending t.join(); io_service.stop(); receive_thread.join(); return 0; } // int test_main(int, char* [])
A sample run produces:
0.213436 -0.49558 1.57538 -1.0592 1.83927 1.88577 ...