Introduction

    Paracel is a distributed optimization framework, designed for many machine learning problems such as logistic regression, SVD, Matrix Factorization, LDA and so on. It provides a distributed, global key-value space which is called parameter server. Messages in paracel(also defined as parameters) must have a key-value structure. When doing commnication, you need to interact with parameter servers by read/write/update messages. This is a quick tutorial that should take no more than 20 minutes to complete.

Get Started

    Download Paracel project from github: https://github.com/douban/paracel. Build Paracel and all its dependencies on your cluster using cmake. Follow this deployment section in API reference document. Before everything is going on, make sure that you have already successfully installed paracel. To ensure that, follow the instruction here to test it out. In the following, we suppose that you have installed Paracel at /opt/paracel folder, modify the path if you installed paracel in different places.

    Generate dataset, here we use word count for simplicity.

python /opt/paracel/tool/datagen.py -m wc -o data.txt
    

    Create a json file named demo_cfg.json as below:

{
    "input" : "data.txt",
    "output" : "./wc_output/",
    "topk" : 10,
    "handle_file" : "/opt/paracel/lib/libwc_update.so",
    "update_function" : "wc_updater",
    "filter_function" : "wc_filter"
}
    

    Use prun.py to start the program(4 worker, 2 server, local mode in this example).

/opt/paracel/prun.py -w 4 -p 2 -c demo_cfg.json -m local /opt/paracel/bin/wc
    

    You will get the top10 word list:

the : 62075
and : 38850
of : 34434
to : 13384
And : 12846
that : 12577
in : 12334
shall : 9760
he : 9666
unto : 8940
    

Write a Paracel Application

    There are two strong reasons choosing Paracel to develop your applications. Firstly, parameter server paradigm costs little effort with model transforming compared to Mapreduce and Graphlab framework. You can add new features into your application without refactoring the whole implementation. Secondly, Paracel's program is very close to its sequential version, developers usally write a sequential version and then modify it with few lines of code to parallelize. For machine learning problems in Paracel, the usual processes are as follows:

  1. During initialization, load training data and push model info and extra info to parameters.
  2. At the beginning of each iteration, pull related model down from parameter servers.
  3. Calculate locally, and update the increment model into parameter servers.
  4. Repeat 2-3 until meeting given convergence condition.
  5. Pull latest model(last pull) from parameter servers and output the final results.

  Word Count Example

    Let's start with a simple word count example. Firstly, you must write a subclass inherits the paralg baseclass:

// wc.hpp
#include "ps.hpp" // paralg
class word_count : public paralg {
public:
  word_count(paracel::Comm comm,
             string hosts_dct_str,
             string in,
             string out,
             int topk) :
  paralg(hosts_dct_str, comm, out),
  input(in),
  output(out) { /* init local member var here.. */ }

private:
  string input, output;
  int ktop;
}; // class word_count

    Secondly, write a update function in update.cpp(similar to reduce function in Mapreduce). Compile it to update.so using cmake.

// update.cpp
#include "proxy.hpp" // update_proxy
#include "paracel_types.hpp" // update_result

int local_reduce(int value, int delta) {
  return value + delta;
}

update_result updater = update_proxy(local_reduce) // register updater to paracel framework
    

    Thirdly, implement the virtual method solve to work out the problem, for example:

// wc.hpp
private:
  void local_learning(const vector<string> & lines) {
    paracel_register_bupdate("update.so", "updater"); // register update function in the subclass
    for(auto & line : lines) {
      auto word_lst = local_parser(line); // parse line to a word list
      for(auto & word : word_lst) {
        paracel_update_default(word, 1); // something like `setdefault in Python`
      }
    }
    paracel_sync(); // wait finish of every workers
    auto result = paracel_read_topk(ktop); // get ktop
    // dump result into output
  }

public:
  virtual void solve() {
    auto lines = paracel_load(input); // parallel load input to lines
    local_learning(lines);
  }
    

    Lastly, write a main function in main.cpp:

// main.cpp
#include <google/gflags.h>
#include "utils.hpp"
#include "wc.hpp"
DEFINE_string(server_info);
DEFINE_string(cfg_file);
int main(int argc, char *argv[])
{
  // init worker environment
  paracel::main_env comm_main_env(argc, argv);
  paracel::Comm comm(MPI_COMM_WORLD);

  // init parameter server environment
  google::SetUsageMessage("[options]\n\t--server_info\n");
  google::ParseCommandLineFlags(&argcm &argv, true);

  // you can also init local paras by other fmt(we recomment json)
  string input = "wc.hpp";
  string output = "result.txt";
  int ktop = 10;

  // create a instance
  word_count solver(comm, FLAGS_server_info, input, output, ktop);
  // solve problems
  solver.solve();
  return 0;
}
    

    You are almost done! The only thing you have to do now is compiling the codes above(see other algorithms's CMakeLiskts.txt in their directory for example), then run with prun.py script.

  Second Version of Word Count

    In the former word count example, we notice that lines in each worker may contain lots of same words, so we can combine them locally.

// wc.hpp
private:
  void local_learning(const vector<string> & lines) {
    std::unordered_map<std::string, int> tmp;
    for(auto & line : lines) {
      auto word_lst = parser(line);
       for(auto & word : word_lst) {
         tmp[word] += 1;
       }
    }
    for(auto & wc : tmp) {
      paracel_bupdate(wc.first, wc.second, "update.so", updater");
    }
    paracel_sync();
    paracel_read_topk(topk, result);
  }

  Third Version of Word Count

    If you run the second version of word count, you will find the program is running very slow. It is a very common problem users will encounter when writing distributed algorithms at first time. The problem came out because communication time dominates computing time and it went against Amdahl's law. There is no free lunch, so you must try to optimize the communication time. A commonly used strategy is packaging small messages to save network lantancy. In the following code, we create local_vd: a local word dictory for every server to communicate with. In this case, you have to write a new update function to merge local word dictories into a global word count mapping relation. We will omit implementation of that and you could reference the wc.hpp source code for more details. Don't worry about that, you are already able to use Paracel in action. Congratulations!

// wc.hpp
private:
  void local_learning(const vector<string> & lines) {
    std::unordered_map<std::string, int> tmp;
    for(auto & line : lines) {
      auto word_lst = parser(line);
       for(auto & word : word_lst) {
         tmp[word] += 1;
       }
    }
    int dct_sz = 100;
    std::vector<std::unordered_map<std::string, int> > local_vd(dct_sz);
    paracel::hash_type<std::string> hfunc;
    for(auto & kv : tmp) {
      auto indx = hfunc(kv.first) % dct_sz;
      loval_vd[indx][kv.first] = kv.second;
    }
    paracel_sync();
    for(int k = 0; k < dct_sz; ++k) {
      paracel_bupdate("key_" + std::to_string(k),
		local_vd[k],
		"update.so",
		"optimize_updater");
    }
    paracel_sync();
    // get topk result
    ...
  }
}

  Logistic Regression Example

    In this part, with a simple logistic regression example, we will show you how to use asynchronous controlling to speed up your application in Paracel. The key point is that the distributed version is very close to the sequential version and you only need to add few lines of code to make it asynchronously. The sequential version is really straightfoward, we use SGD optimization method to minimize the likelihood objective function:

void logistic_regression::solve() {
  // init data
  auto lines = paracel_load(input);
  local_parser(lines);
  serial_learning();
}

void logistic_regression::serial_learning() {
  theta = paracel::random_double_list(data_dim);
  for(int rd = 0; rd < rounds; ++rd) {
    for(int i = 0; i < data_dim; ++i) {
      delta[i] = 0.;
    }
    random_shuffle(idx.begin(), idx.end());
    for(auto sample_id : idx) {
      for(int i = 0; i < data_dim; ++i) {
        delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i];
      }
      for(int i = 0; i < data_dim; ++i) {
        theta[i] += delta[i];
      }
    } // traverse
  }
}

    To parallelize, you have to follow the training model in Paracel: pull parameters before training and push local updates after training. The code is very close to previous version:

void logistic_regression::solve() {
  // init data
  auto lines = paracel_load(input);
  local_parser(lines);
  paracel_sync();
  parallel_learning();
}

void logistic_regression::parallel_learning() {
  theta = paracel::random_double_list(data_dim);
  paracel_write("theta", theta); // init push
  for(int rd = 0; rd < rounds; ++rd) {
    for(int i = 0; i < data_dim; ++i) {
      delta[i] = 0.;
    }
    random_shuffle(idx.begin(), idx.end());
    theta = paracel_read<vector<double> >("theta"); // pull thera
    for(auto sample_id : idx) {
      for(int i = 0; i < data_dim; ++i) {
        delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i];
      }
    } // traverse

    paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta in parameter servers
  }
  theta = paracel_read<vector<double> >("theta"); // last pull
}

    Asynchronous learning is a very important feature in Paracel to speed up convergency. You can add 4 lines of code in the logistic regression example to make it work. In constructor of your subclass, open ssp_switch and set a limit_s parameter which means the fastest worker can lead no more than limit_s iterations than the slowest worker. Set total iterations during initialization and commit to parameter server at the end of each iteration.

logistic_regression::logistic_regression(paracel::Comm comm,
                                        std::string hosts_dct_str,
                                        std::string _output,
                                        int _rounds)
    : paracel::paralg(hosts_dct_str,
                      comm,
                      _output,
                      _rounds,
                      3, // set limit_s
                      true /* open ssp switch */ ) {}

void logistic_regression::solve() {
  // init data
  auto lines = paracel_load(input);
  local_parser(lines);
  set_total_iters(rounds); // set total iterations
  paracel_sync();
  dgd_learning();
}

void logistic_regression::dgd_learning() {
  theta = paracel::random_double_list(data_dim);
  paracel_write("theta", theta); // init push
  for(int rd = 0; rd < rounds; ++rd) {
    for(int i = 0; i < data_dim; ++i) {
      delta[i] = 0.;
    }
    random_shuffle(idx.begin(), idx.end());
    theta = paracel_read<vector<double> >("theta"); // pull thera
    for(auto sample_id : idx) {
      for(int i = 0; i < data_dim; ++i) {
        delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i];
      }
    } // traverse

    paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta
    iter_commit(); // commit to parameter server at the end of each iteration
  }
  theta = paracel_read<vector<double> >("theta"); // last pull
}

    So that's it for the quick tour of Paracel and hope you enjoy using Paracel.

Back to Project Homepage