Difference between revisions of "MPI Port"

From Gw-qcd-wiki
Jump to: navigation, search
(Created page with 'Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to …')
 
(Describe new code)
 
(6 intermediate revisions by the same user not shown)
Line 1: Line 1:
 
Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work.
 
Here is my ''very'' preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work.
  
[[File:low.h]]
+
Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.
[[File:intermediate.h]]
 
  
 
--[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC)
 
--[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC)
 +
 +
I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think.
 +
 +
--[[User:Bgamari|Bgamari]] 21:00, 26 December 2009 (UTC)
 +
 +
== Low-level ==
 +
<pre>
 +
 +
#include <mpi/mpi.h>
 +
 +
typedef int rank_t;
 +
 +
template<typename T>
 +
MPI_Datatype get_mpi_type() { return 0; }
 +
MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }
 +
MPI_Datatype get_mpi_type<int>() { return MPI_INT; }
 +
MPI_Datatype get_mpi_type<short>() { return MPI_SHORT; }
 +
MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }
 +
MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }
 +
 +
static void mpi_error(MPI_Comm *comm, int *stat, ...)
 +
{
 +
int len;
 +
char err_string[MPI_MAX_ERROR_STRING];
 +
 +
printf("MPI error number: %i\n", *stat);
 +
MPI_Error_string(*stat, err_string, &len);
 +
printf("%s\n", err_string);
 +
terminate(*stat);
 +
}
 +
 +
void init_machine(int argc, char **argv)
 +
{
 +
int i, flag, *tag_ub;
 +
MPI_Comm comm;
 +
MPI_Errhandler errhandler;
 +
 +
flag = MPI_Init(&argc, &argv);
 +
comm = MPI_COMM_WORLD;
 +
if (flag) mpi_error(&comm, &flag);
 +
flag = MPI_Errhandler_create(mpi_error, &errhandler);
 +
if (flag) mpi_error(&comm, &flag);
 +
flag = MPI_Errhandler_set(MPI_COMM_WORLD, errhandler);
 +
if (flag) mpi_error(&comm, &flag);
 +
}
 +
 +
void shutdown_machine()
 +
{
 +
MPI_Finalize();
 +
}
 +
 +
int get_num_nodes()
 +
{
 +
int n;
 +
MPI_Comm_size(MPI_COMM_WORLD, &n);
 +
return n;
 +
}
 +
 +
rank_t get_node_rank()
 +
{
 +
rank_t rank;
 +
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 +
return rank_t;
 +
}
 +
 +
void synchronize()
 +
{
 +
MPI_Barrier(MPI_COMM_WORLD);
 +
}
 +
 +
void broadcast(T* buffer, int length)
 +
{
 +
MPI_Bcast(buffer, length, get_mpi_type<T>(), 0, MPI_COMM_WORLD);
 +
}
 +
 +
template<typename T>
 +
void send(T* buffer, int length, rank_t dest)
 +
{
 +
MPI_Send(buffer, length, get_mpi_type<T>(), dest, tag, MPI_COMM_WORLD);
 +
}
 +
 +
template<typename T>
 +
void recv(T* buffer, int length, rank_t src)
 +
{
 +
MPI_Status status;
 +
MPI_Recv(buffer, length, get_mpi_type<T>(), src, tag, MPI_COMM_WORLD, &status);
 +
}
 +
 +
template<typename T>
 +
T sum(T value)
 +
{
 +
T work;
 +
MPI_Allreduce(&value, &work, 1, get_mpi_type<T>(), MPI_SUM, MPI_COMM_WORLD);
 +
return work;
 +
}
 +
 +
template<typename T>
 +
void sum(T* value, T* out, int length)
 +
{
 +
MPI_Allreduce(&in, out, get_mpi_type<T>(), length, MPI_SUM,, MPI_COMM_WORLD);
 +
}
 +
 +
</pre>
 +
 +
== Intermediate layer ==
 +
<pre>#include <map>
 +
#include <pair>
 +
 +
typedef local_idx_t int;
 +
typedef global_idx_t int;
 +
 +
class comm_list {
 +
// Represents a source_index -> dest_index pair
 +
typedef std::pair<global_idx_t, global_idx_t> comm_entry;
 +
 +
// Maps a destination rank to a list of comm_entries
 +
std::map<rank_t, std::vector<comm_entry> > send;
 +
 +
// Maps a source rank to a list of comm_entries
 +
std::map<rank_t, std::vector<comm_entry> > recv;
 +
 +
// All of the intra-process comm_entries
 +
std::vector<comm_entry> local;
 +
 +
private:
 +
void add_entry(comm_entry& ce) {
 +
global_idx_t src=i->first, dest=i->second;
 +
 +
src_rank = get_site_rank(src);
 +
dest_rank = get_site_rank(dest)
 +
if (src_rank == rank && dest_rank == rank)
 +
local.push_back(*i);
 +
else if (src_rank == rank) {
 +
if (send.find(dest_rank) == send.end())
 +
send[dest_rank] = vector<comm_entry>();
 +
send[dest_rank].append(*i);
 +
} else if (dest_rank == rank) {
 +
if (recv.find(src_rank) == recv.end())
 +
recv[src_rank] = vector<comm_entry>();
 +
recv[src_rank].append(*i);
 +
}
 +
}
 +
 +
public:
 +
struct entry_iterator {
 +
virtual void operator++() = 0;
 +
virtual comm_entry operator*() = 0;
 +
virtual bool end() = 0;
 +
};
 +
public comm_list(rank_t rank, entry_iterator& iter) {
 +
for (; !iter.end(); iter++)
 +
add_entry(*iter);
 +
}
 +
 +
comm_list(rank_t rank, const std::vector<comm_entry> l) {
 +
for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++)
 +
add_entry(*i);
 +
}
 +
};
 +
 +
 +
template<typename T>
 +
struct linked_buffer {
 +
typedef T slot_type;
 +
rank_t peer;
 +
const int length;
 +
shared_ptr<T> buffer;
 +
std::vector<T*> pointers;
 +
 +
linked_buffer(rank_t peer, int length) :
 +
peer(peer),
 +
length(length), buffer(new T[length]),
 +
pointers(length) { }
 +
 +
void gather()
 +
{
 +
for (int i=0; i<length; i++)
 +
buffer[i] = *pointers[i];
 +
}
 +
 +
void scatter()
 +
{
 +
for (int i=0; i<length; i++)
 +
*pointers[i] = buffer[i];
 +
}
 +
};
 +
 +
// TODO: Need to think about this a little more
 +
template<typename T>
 +
struct linked_buffer_set {
 +
typedef linked_buffer<T> buffer;
 +
std::map<rank_t, buffer> send;
 +
std::map<rank_t, buffer> recv;
 +
std::vector<buffer> local;
 +
 +
typedef std::pair<T**, global_idx_t> fixup;
 +
std::vector<fixup> fixups;
 +
 +
linked_buffer_set(comm_list comm)
 +
{
 +
std::vector<fixup> fixups;
 +
std::map<rank, buffer> res;
 +
 +
// Create send buffers
 +
for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin();
 +
process != comm.send.end(); process++) {
 +
rank_t rank = process->first;
 +
std::vector<comm_entry> entries = process->second;
 +
buffer buf(rank, entries.length);
 +
res[rank] = buf;
 +
 +
// Create fix-ups
 +
std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
 +
for (int i=0; entry != entries.end(); entry++, i++) {
 +
global_idx_t src = entry->first;
 +
fixup fixup(&buf.pointers[i], src);
 +
fixups.push_back(fixup);
 +
}
 +
}
 +
 +
// Create recv buffers
 +
for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin();
 +
process != comm.recv.end(); process++) {
 +
rank_t rank = process->first;
 +
std::vector<comm_entry> entries = process->second;
 +
buffer buf(rank, entries.length);
 +
res[rank] = buf;
 +
 +
// Create fix-ups
 +
std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
 +
for (int i=0; entry != entries.end(); entry++, i++) {
 +
global_idx_t dest = entry->second;
 +
fixup fixup(&buf.pointers[i], dest);
 +
fixups.push_back(fixup);
 +
}
 +
}
 +
}
 +
};
 +
 +
</pre>

Latest revision as of 16:00, 26 December 2009

Here is my very preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work.

Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.

--Bgamari 19:48, 26 December 2009 (UTC)

I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think.

--Bgamari 21:00, 26 December 2009 (UTC)

Low-level


#include <mpi/mpi.h>

typedef int rank_t;

template<typename T>
MPI_Datatype get_mpi_type() { return 0; }
MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }
MPI_Datatype get_mpi_type<int>() { return MPI_INT; }
MPI_Datatype get_mpi_type<short>() { return MPI_SHORT; }
MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }
MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }

static void mpi_error(MPI_Comm *comm, int *stat, ...)
{
	int len;
	char err_string[MPI_MAX_ERROR_STRING];

	printf("MPI error number: %i\n", *stat);
	MPI_Error_string(*stat, err_string, &len);
	printf("%s\n", err_string);
	terminate(*stat);
}

void init_machine(int argc, char **argv)
{
	int i, flag, *tag_ub;
	MPI_Comm comm;
	MPI_Errhandler errhandler;

	flag = MPI_Init(&argc, &argv);
	comm = MPI_COMM_WORLD;
	if (flag) mpi_error(&comm, &flag);
	flag = MPI_Errhandler_create(mpi_error, &errhandler);
	if (flag) mpi_error(&comm, &flag);
	flag = MPI_Errhandler_set(MPI_COMM_WORLD, errhandler);
	if (flag) mpi_error(&comm, &flag);
}

void shutdown_machine()
{
	MPI_Finalize();
}

int get_num_nodes()
{
	int n;
	MPI_Comm_size(MPI_COMM_WORLD, &n);
	return n;
}

rank_t get_node_rank()
{
	rank_t rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
	return rank_t;
}

void synchronize()
{
	MPI_Barrier(MPI_COMM_WORLD);
}

void broadcast(T* buffer, int length)
{
	MPI_Bcast(buffer, length, get_mpi_type<T>(), 0, MPI_COMM_WORLD);
}

template<typename T>
void send(T* buffer, int length, rank_t dest)
{
	MPI_Send(buffer, length, get_mpi_type<T>(), dest, tag, MPI_COMM_WORLD);
}

template<typename T>
void recv(T* buffer, int length, rank_t src)
{
	MPI_Status status;
	MPI_Recv(buffer, length, get_mpi_type<T>(), src, tag, MPI_COMM_WORLD, &status);
}

template<typename T>
T sum(T value)
{
	T work;
	MPI_Allreduce(&value, &work, 1, get_mpi_type<T>(), MPI_SUM, MPI_COMM_WORLD);
	return work;
}

template<typename T>
void sum(T* value, T* out, int length)
{
	MPI_Allreduce(&in, out, get_mpi_type<T>(), length, MPI_SUM,, MPI_COMM_WORLD);
}

Intermediate layer

#include <map>
#include <pair>

typedef local_idx_t int;
typedef global_idx_t int;

class comm_list {
	// Represents a source_index -> dest_index pair
	typedef std::pair<global_idx_t, global_idx_t> comm_entry;

	// Maps a destination rank to a list of comm_entries
	std::map<rank_t, std::vector<comm_entry> > send;

	// Maps a source rank to a list of comm_entries
	std::map<rank_t, std::vector<comm_entry> > recv;

	// All of the intra-process comm_entries
	std::vector<comm_entry> local;

private:
	void add_entry(comm_entry& ce) {
		global_idx_t src=i->first, dest=i->second;

		src_rank = get_site_rank(src);
		dest_rank = get_site_rank(dest)
		if (src_rank == rank && dest_rank == rank)
			local.push_back(*i);
		else if (src_rank == rank) {
			if (send.find(dest_rank) == send.end())
				send[dest_rank] = vector<comm_entry>();
			send[dest_rank].append(*i);
		} else if (dest_rank == rank) {
			if (recv.find(src_rank) == recv.end())
				recv[src_rank] = vector<comm_entry>();
			recv[src_rank].append(*i);
		}
	}

public:
	struct entry_iterator {
		virtual void operator++() = 0;
		virtual comm_entry operator*() = 0;
		virtual bool end() = 0;
	};
	public comm_list(rank_t rank, entry_iterator& iter) {
		for (; !iter.end(); iter++)
			add_entry(*iter);
	}

	comm_list(rank_t rank, const std::vector<comm_entry> l) {
		for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++)
			add_entry(*i);
	}
};


template<typename T>
struct linked_buffer {
	typedef T slot_type;
	rank_t peer;
	const int length;
	shared_ptr<T> buffer;
	std::vector<T*> pointers;

	linked_buffer(rank_t peer, int length) :
		peer(peer),
		length(length), buffer(new T[length]),
		pointers(length) { }

	void gather()
	{
		for (int i=0; i<length; i++)
			buffer[i] = *pointers[i];
	}

	void scatter()
	{
		for (int i=0; i<length; i++)
			*pointers[i] = buffer[i];
	}
};

// TODO: Need to think about this a little more
template<typename T>
struct linked_buffer_set {
	typedef linked_buffer<T> buffer;
	std::map<rank_t, buffer> send;
	std::map<rank_t, buffer> recv;
	std::vector<buffer> local;

	typedef std::pair<T**, global_idx_t> fixup;
	std::vector<fixup> fixups;

	linked_buffer_set(comm_list comm)
	{
		std::vector<fixup> fixups;
		std::map<rank, buffer> res;

		// Create send buffers
		for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin();
				process != comm.send.end(); process++) {
			rank_t rank = process->first;
			std::vector<comm_entry> entries = process->second;
			buffer buf(rank, entries.length);
			res[rank] = buf;

			// Create fix-ups
			std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
			for (int i=0; entry != entries.end(); entry++, i++) {
				global_idx_t src = entry->first;
				fixup fixup(&buf.pointers[i], src);
				fixups.push_back(fixup);
			}
		}

		// Create recv buffers
		for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin();
				process != comm.recv.end(); process++) {
			rank_t rank = process->first;
			std::vector<comm_entry> entries = process->second;
			buffer buf(rank, entries.length);
			res[rank] = buf;

			// Create fix-ups
			std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
			for (int i=0; entry != entries.end(); entry++, i++) {
				global_idx_t dest = entry->second;
				fixup fixup(&buf.pointers[i], dest);
				fixups.push_back(fixup);
			}
		}
	}
};