Difference between revisions of "MPI Port"
(Created page with 'Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to …') |
(Describe new code) |
||
(6 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work. | Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work. | ||
− | + | Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files. | |
− | |||
--[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC) | --[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC) | ||
+ | |||
+ | I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think. | ||
+ | |||
+ | --[[User:Bgamari|Bgamari]] 21:00, 26 December 2009 (UTC) | ||
+ | |||
+ | == Low-level == | ||
+ | <pre> | ||
+ | |||
+ | #include <mpi/mpi.h> | ||
+ | |||
+ | typedef int rank_t; | ||
+ | |||
+ | template<typename T> | ||
+ | MPI_Datatype get_mpi_type() { return 0; } | ||
+ | MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; } | ||
+ | MPI_Datatype get_mpi_type<int>() { return MPI_INT; } | ||
+ | MPI_Datatype get_mpi_type<short>() { return MPI_SHORT; } | ||
+ | MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; } | ||
+ | MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; } | ||
+ | |||
+ | static void mpi_error(MPI_Comm *comm, int *stat, ...) | ||
+ | { | ||
+ | int len; | ||
+ | char err_string[MPI_MAX_ERROR_STRING]; | ||
+ | |||
+ | printf("MPI error number: %i\n", *stat); | ||
+ | MPI_Error_string(*stat, err_string, &len); | ||
+ | printf("%s\n", err_string); | ||
+ | terminate(*stat); | ||
+ | } | ||
+ | |||
+ | void init_machine(int argc, char **argv) | ||
+ | { | ||
+ | int i, flag, *tag_ub; | ||
+ | MPI_Comm comm; | ||
+ | MPI_Errhandler errhandler; | ||
+ | |||
+ | flag = MPI_Init(&argc, &argv); | ||
+ | comm = MPI_COMM_WORLD; | ||
+ | if (flag) mpi_error(&comm, &flag); | ||
+ | flag = MPI_Errhandler_create(mpi_error, &errhandler); | ||
+ | if (flag) mpi_error(&comm, &flag); | ||
+ | flag = MPI_Errhandler_set(MPI_COMM_WORLD, errhandler); | ||
+ | if (flag) mpi_error(&comm, &flag); | ||
+ | } | ||
+ | |||
+ | void shutdown_machine() | ||
+ | { | ||
+ | MPI_Finalize(); | ||
+ | } | ||
+ | |||
+ | int get_num_nodes() | ||
+ | { | ||
+ | int n; | ||
+ | MPI_Comm_size(MPI_COMM_WORLD, &n); | ||
+ | return n; | ||
+ | } | ||
+ | |||
+ | rank_t get_node_rank() | ||
+ | { | ||
+ | rank_t rank; | ||
+ | MPI_Comm_rank(MPI_COMM_WORLD, &rank); | ||
+ | return rank_t; | ||
+ | } | ||
+ | |||
+ | void synchronize() | ||
+ | { | ||
+ | MPI_Barrier(MPI_COMM_WORLD); | ||
+ | } | ||
+ | |||
+ | void broadcast(T* buffer, int length) | ||
+ | { | ||
+ | MPI_Bcast(buffer, length, get_mpi_type<T>(), 0, MPI_COMM_WORLD); | ||
+ | } | ||
+ | |||
+ | template<typename T> | ||
+ | void send(T* buffer, int length, rank_t dest) | ||
+ | { | ||
+ | MPI_Send(buffer, length, get_mpi_type<T>(), dest, tag, MPI_COMM_WORLD); | ||
+ | } | ||
+ | |||
+ | template<typename T> | ||
+ | void recv(T* buffer, int length, rank_t src) | ||
+ | { | ||
+ | MPI_Status status; | ||
+ | MPI_Recv(buffer, length, get_mpi_type<T>(), src, tag, MPI_COMM_WORLD, &status); | ||
+ | } | ||
+ | |||
+ | template<typename T> | ||
+ | T sum(T value) | ||
+ | { | ||
+ | T work; | ||
+ | MPI_Allreduce(&value, &work, 1, get_mpi_type<T>(), MPI_SUM, MPI_COMM_WORLD); | ||
+ | return work; | ||
+ | } | ||
+ | |||
+ | template<typename T> | ||
+ | void sum(T* value, T* out, int length) | ||
+ | { | ||
+ | MPI_Allreduce(&in, out, get_mpi_type<T>(), length, MPI_SUM,, MPI_COMM_WORLD); | ||
+ | } | ||
+ | |||
+ | </pre> | ||
+ | |||
+ | == Intermediate layer == | ||
+ | <pre>#include <map> | ||
+ | #include <pair> | ||
+ | |||
+ | typedef local_idx_t int; | ||
+ | typedef global_idx_t int; | ||
+ | |||
+ | class comm_list { | ||
+ | // Represents a source_index -> dest_index pair | ||
+ | typedef std::pair<global_idx_t, global_idx_t> comm_entry; | ||
+ | |||
+ | // Maps a destination rank to a list of comm_entries | ||
+ | std::map<rank_t, std::vector<comm_entry> > send; | ||
+ | |||
+ | // Maps a source rank to a list of comm_entries | ||
+ | std::map<rank_t, std::vector<comm_entry> > recv; | ||
+ | |||
+ | // All of the intra-process comm_entries | ||
+ | std::vector<comm_entry> local; | ||
+ | |||
+ | private: | ||
+ | void add_entry(comm_entry& ce) { | ||
+ | global_idx_t src=i->first, dest=i->second; | ||
+ | |||
+ | src_rank = get_site_rank(src); | ||
+ | dest_rank = get_site_rank(dest) | ||
+ | if (src_rank == rank && dest_rank == rank) | ||
+ | local.push_back(*i); | ||
+ | else if (src_rank == rank) { | ||
+ | if (send.find(dest_rank) == send.end()) | ||
+ | send[dest_rank] = vector<comm_entry>(); | ||
+ | send[dest_rank].append(*i); | ||
+ | } else if (dest_rank == rank) { | ||
+ | if (recv.find(src_rank) == recv.end()) | ||
+ | recv[src_rank] = vector<comm_entry>(); | ||
+ | recv[src_rank].append(*i); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public: | ||
+ | struct entry_iterator { | ||
+ | virtual void operator++() = 0; | ||
+ | virtual comm_entry operator*() = 0; | ||
+ | virtual bool end() = 0; | ||
+ | }; | ||
+ | public comm_list(rank_t rank, entry_iterator& iter) { | ||
+ | for (; !iter.end(); iter++) | ||
+ | add_entry(*iter); | ||
+ | } | ||
+ | |||
+ | comm_list(rank_t rank, const std::vector<comm_entry> l) { | ||
+ | for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++) | ||
+ | add_entry(*i); | ||
+ | } | ||
+ | }; | ||
+ | |||
+ | |||
+ | template<typename T> | ||
+ | struct linked_buffer { | ||
+ | typedef T slot_type; | ||
+ | rank_t peer; | ||
+ | const int length; | ||
+ | shared_ptr<T> buffer; | ||
+ | std::vector<T*> pointers; | ||
+ | |||
+ | linked_buffer(rank_t peer, int length) : | ||
+ | peer(peer), | ||
+ | length(length), buffer(new T[length]), | ||
+ | pointers(length) { } | ||
+ | |||
+ | void gather() | ||
+ | { | ||
+ | for (int i=0; i<length; i++) | ||
+ | buffer[i] = *pointers[i]; | ||
+ | } | ||
+ | |||
+ | void scatter() | ||
+ | { | ||
+ | for (int i=0; i<length; i++) | ||
+ | *pointers[i] = buffer[i]; | ||
+ | } | ||
+ | }; | ||
+ | |||
+ | // TODO: Need to think about this a little more | ||
+ | template<typename T> | ||
+ | struct linked_buffer_set { | ||
+ | typedef linked_buffer<T> buffer; | ||
+ | std::map<rank_t, buffer> send; | ||
+ | std::map<rank_t, buffer> recv; | ||
+ | std::vector<buffer> local; | ||
+ | |||
+ | typedef std::pair<T**, global_idx_t> fixup; | ||
+ | std::vector<fixup> fixups; | ||
+ | |||
+ | linked_buffer_set(comm_list comm) | ||
+ | { | ||
+ | std::vector<fixup> fixups; | ||
+ | std::map<rank, buffer> res; | ||
+ | |||
+ | // Create send buffers | ||
+ | for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin(); | ||
+ | process != comm.send.end(); process++) { | ||
+ | rank_t rank = process->first; | ||
+ | std::vector<comm_entry> entries = process->second; | ||
+ | buffer buf(rank, entries.length); | ||
+ | res[rank] = buf; | ||
+ | |||
+ | // Create fix-ups | ||
+ | std::vector<comm_list::comm_entry>::iterator entry = entries.begin; | ||
+ | for (int i=0; entry != entries.end(); entry++, i++) { | ||
+ | global_idx_t src = entry->first; | ||
+ | fixup fixup(&buf.pointers[i], src); | ||
+ | fixups.push_back(fixup); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | // Create recv buffers | ||
+ | for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin(); | ||
+ | process != comm.recv.end(); process++) { | ||
+ | rank_t rank = process->first; | ||
+ | std::vector<comm_entry> entries = process->second; | ||
+ | buffer buf(rank, entries.length); | ||
+ | res[rank] = buf; | ||
+ | |||
+ | // Create fix-ups | ||
+ | std::vector<comm_list::comm_entry>::iterator entry = entries.begin; | ||
+ | for (int i=0; entry != entries.end(); entry++, i++) { | ||
+ | global_idx_t dest = entry->second; | ||
+ | fixup fixup(&buf.pointers[i], dest); | ||
+ | fixups.push_back(fixup); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | }; | ||
+ | |||
+ | </pre> |
Latest revision as of 16:00, 26 December 2009
Here is my very preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work.
Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.
--Bgamari 19:48, 26 December 2009 (UTC)
I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think.
--Bgamari 21:00, 26 December 2009 (UTC)
Low-level
#include <mpi/mpi.h> typedef int rank_t; template<typename T> MPI_Datatype get_mpi_type() { return 0; } MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; } MPI_Datatype get_mpi_type<int>() { return MPI_INT; } MPI_Datatype get_mpi_type<short>() { return MPI_SHORT; } MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; } MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; } static void mpi_error(MPI_Comm *comm, int *stat, ...) { int len; char err_string[MPI_MAX_ERROR_STRING]; printf("MPI error number: %i\n", *stat); MPI_Error_string(*stat, err_string, &len); printf("%s\n", err_string); terminate(*stat); } void init_machine(int argc, char **argv) { int i, flag, *tag_ub; MPI_Comm comm; MPI_Errhandler errhandler; flag = MPI_Init(&argc, &argv); comm = MPI_COMM_WORLD; if (flag) mpi_error(&comm, &flag); flag = MPI_Errhandler_create(mpi_error, &errhandler); if (flag) mpi_error(&comm, &flag); flag = MPI_Errhandler_set(MPI_COMM_WORLD, errhandler); if (flag) mpi_error(&comm, &flag); } void shutdown_machine() { MPI_Finalize(); } int get_num_nodes() { int n; MPI_Comm_size(MPI_COMM_WORLD, &n); return n; } rank_t get_node_rank() { rank_t rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); return rank_t; } void synchronize() { MPI_Barrier(MPI_COMM_WORLD); } void broadcast(T* buffer, int length) { MPI_Bcast(buffer, length, get_mpi_type<T>(), 0, MPI_COMM_WORLD); } template<typename T> void send(T* buffer, int length, rank_t dest) { MPI_Send(buffer, length, get_mpi_type<T>(), dest, tag, MPI_COMM_WORLD); } template<typename T> void recv(T* buffer, int length, rank_t src) { MPI_Status status; MPI_Recv(buffer, length, get_mpi_type<T>(), src, tag, MPI_COMM_WORLD, &status); } template<typename T> T sum(T value) { T work; MPI_Allreduce(&value, &work, 1, get_mpi_type<T>(), MPI_SUM, MPI_COMM_WORLD); return work; } template<typename T> void sum(T* value, T* out, int length) { MPI_Allreduce(&in, out, get_mpi_type<T>(), length, MPI_SUM,, MPI_COMM_WORLD); }
Intermediate layer
#include <map> #include <pair> typedef local_idx_t int; typedef global_idx_t int; class comm_list { // Represents a source_index -> dest_index pair typedef std::pair<global_idx_t, global_idx_t> comm_entry; // Maps a destination rank to a list of comm_entries std::map<rank_t, std::vector<comm_entry> > send; // Maps a source rank to a list of comm_entries std::map<rank_t, std::vector<comm_entry> > recv; // All of the intra-process comm_entries std::vector<comm_entry> local; private: void add_entry(comm_entry& ce) { global_idx_t src=i->first, dest=i->second; src_rank = get_site_rank(src); dest_rank = get_site_rank(dest) if (src_rank == rank && dest_rank == rank) local.push_back(*i); else if (src_rank == rank) { if (send.find(dest_rank) == send.end()) send[dest_rank] = vector<comm_entry>(); send[dest_rank].append(*i); } else if (dest_rank == rank) { if (recv.find(src_rank) == recv.end()) recv[src_rank] = vector<comm_entry>(); recv[src_rank].append(*i); } } public: struct entry_iterator { virtual void operator++() = 0; virtual comm_entry operator*() = 0; virtual bool end() = 0; }; public comm_list(rank_t rank, entry_iterator& iter) { for (; !iter.end(); iter++) add_entry(*iter); } comm_list(rank_t rank, const std::vector<comm_entry> l) { for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++) add_entry(*i); } }; template<typename T> struct linked_buffer { typedef T slot_type; rank_t peer; const int length; shared_ptr<T> buffer; std::vector<T*> pointers; linked_buffer(rank_t peer, int length) : peer(peer), length(length), buffer(new T[length]), pointers(length) { } void gather() { for (int i=0; i<length; i++) buffer[i] = *pointers[i]; } void scatter() { for (int i=0; i<length; i++) *pointers[i] = buffer[i]; } }; // TODO: Need to think about this a little more template<typename T> struct linked_buffer_set { typedef linked_buffer<T> buffer; std::map<rank_t, buffer> send; std::map<rank_t, buffer> recv; std::vector<buffer> local; typedef std::pair<T**, global_idx_t> fixup; std::vector<fixup> fixups; linked_buffer_set(comm_list comm) { std::vector<fixup> fixups; std::map<rank, buffer> res; // Create send buffers for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin(); process != comm.send.end(); process++) { rank_t rank = process->first; std::vector<comm_entry> entries = process->second; buffer buf(rank, entries.length); res[rank] = buf; // Create fix-ups std::vector<comm_list::comm_entry>::iterator entry = entries.begin; for (int i=0; entry != entries.end(); entry++, i++) { global_idx_t src = entry->first; fixup fixup(&buf.pointers[i], src); fixups.push_back(fixup); } } // Create recv buffers for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin(); process != comm.recv.end(); process++) { rank_t rank = process->first; std::vector<comm_entry> entries = process->second; buffer buf(rank, entries.length); res[rank] = buf; // Create fix-ups std::vector<comm_list::comm_entry>::iterator entry = entries.begin; for (int i=0; entry != entries.end(); entry++, i++) { global_idx_t dest = entry->second; fixup fixup(&buf.pointers[i], dest); fixups.push_back(fixup); } } } };