Difference between revisions of "MPI Port"

From Gw-qcd-wiki
Jump to: navigation, search
(Add code)
(Describe new code)
 
(5 intermediate revisions by the same user not shown)
Line 2: Line 2:
  
 
Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.
 
Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.
 +
 +
--[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC)
 +
 +
I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think.
 +
 +
--[[User:Bgamari|Bgamari]] 21:00, 26 December 2009 (UTC)
  
 
== Low-level ==
 
== Low-level ==
<nowiki>
+
<pre>
  
 
#include <mpi/mpi.h>
 
#include <mpi/mpi.h>
  
typedef rank_t int;
+
typedef int rank_t;
  
 
template<typename T>
 
template<typename T>
Line 100: Line 106:
 
}
 
}
  
</nowiki>
+
</pre>
  
 
== Intermediate layer ==
 
== Intermediate layer ==
<nowiki>
+
<pre>#include <map>
 
 
#include <map>
 
 
#include <pair>
 
#include <pair>
  
Line 124: Line 128:
 
std::vector<comm_entry> local;
 
std::vector<comm_entry> local;
  
public comm_list(rank_t rank, const std::vector<comm_entry> l) {
+
private:
for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++) {
+
void add_entry(comm_entry& ce) {
global_idx_t src=i->first, dest=i->second;
+
global_idx_t src=i->first, dest=i->second;
  
src_rank = get_site_rank(src);
+
src_rank = get_site_rank(src);
dest_rank = get_site_rank(dest)
+
dest_rank = get_site_rank(dest)
if (src_rank == rank && dest_rank == rank)
+
if (src_rank == rank && dest_rank == rank)
local.push_back(*i);
+
local.push_back(*i);
else if (src_rank == rank) {
+
else if (src_rank == rank) {
if (send.find(dest_rank) == send.end())
+
if (send.find(dest_rank) == send.end())
send[dest_rank] = vector<comm_entry>();
+
send[dest_rank] = vector<comm_entry>();
send[dest_rank].append(*i);
+
send[dest_rank].append(*i);
} else if (dest_rank == rank) {
+
} else if (dest_rank == rank) {
if (recv.find(src_rank) == recv.end())
+
if (recv.find(src_rank) == recv.end())
recv[src_rank] = vector<comm_entry>();
+
recv[src_rank] = vector<comm_entry>();
recv[src_rank].append(*i);
+
recv[src_rank].append(*i);
}
 
 
}
 
}
 
}
 
}
  
 +
public:
 +
struct entry_iterator {
 +
virtual void operator++() = 0;
 +
virtual comm_entry operator*() = 0;
 +
virtual bool end() = 0;
 +
};
 +
public comm_list(rank_t rank, entry_iterator& iter) {
 +
for (; !iter.end(); iter++)
 +
add_entry(*iter);
 +
}
 +
 +
comm_list(rank_t rank, const std::vector<comm_entry> l) {
 +
for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++)
 +
add_entry(*i);
 +
}
 
};
 
};
  
Line 163: Line 181:
 
{
 
{
 
for (int i=0; i<length; i++)
 
for (int i=0; i<length; i++)
buffers[i] = *pointers[i];
+
buffer[i] = *pointers[i];
 
}
 
}
  
Line 169: Line 187:
 
{
 
{
 
for (int i=0; i<length; i++)
 
for (int i=0; i<length; i++)
*pointers[i] = buffers[i];
+
*pointers[i] = buffer[i];
 
}
 
}
 
};
 
};
  
 
// TODO: Need to think about this a little more
 
// TODO: Need to think about this a little more
struct linked_buffer_info {
+
template<typename T>
map<
+
struct linked_buffer_set {
vector<linked_buffer> create_linked_buffers(comm_list comm)
+
typedef linked_buffer<T> buffer;
{
+
std::map<rank_t, buffer> send;
+
std::map<rank_t, buffer> recv;
}
+
std::vector<buffer> local;
 +
 
 +
typedef std::pair<T**, global_idx_t> fixup;
 +
std::vector<fixup> fixups;
 +
 
 +
linked_buffer_set(comm_list comm)
 +
{
 +
std::vector<fixup> fixups;
 +
std::map<rank, buffer> res;
 +
 
 +
// Create send buffers
 +
for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin();
 +
process != comm.send.end(); process++) {
 +
rank_t rank = process->first;
 +
std::vector<comm_entry> entries = process->second;
 +
buffer buf(rank, entries.length);
 +
res[rank] = buf;
 +
 
 +
// Create fix-ups
 +
std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
 +
for (int i=0; entry != entries.end(); entry++, i++) {
 +
global_idx_t src = entry->first;
 +
fixup fixup(&buf.pointers[i], src);
 +
fixups.push_back(fixup);
 +
}
 +
}
 +
 
 +
// Create recv buffers
 +
for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin();
 +
process != comm.recv.end(); process++) {
 +
rank_t rank = process->first;
 +
std::vector<comm_entry> entries = process->second;
 +
buffer buf(rank, entries.length);
 +
res[rank] = buf;
  
</nowiki>
+
// Create fix-ups
 +
std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
 +
for (int i=0; entry != entries.end(); entry++, i++) {
 +
global_idx_t dest = entry->second;
 +
fixup fixup(&buf.pointers[i], dest);
 +
fixups.push_back(fixup);
 +
}
 +
}
 +
}
 +
};
  
--[[User:Bgamari|Bgamari]] 19:48, 26 December 2009 (UTC)
+
</pre>

Latest revision as of 16:00, 26 December 2009

Here is my very preliminary work on the MPI infrastructure. There were a few deviations from the specification however they are all fairly minor. We still definitely need to talk about the linked_buffer implementation. The implementation as was sketched out in the specification doesn't seem to have any easy means of filling the pointers table after create_linked_buffers has done its work.

Sorry about the formatting. The wiki strangely denied each of my attempts at attaching the files.

--Bgamari 19:48, 26 December 2009 (UTC)

I've updated the below code. The intermediate layer now includes a (probably incorrect) implementation of the pointers fixup table scheme described above. Not that this code obviously hasn't been compile-tested, it's just trying to get the concept across. Let me know what you think.

--Bgamari 21:00, 26 December 2009 (UTC)

Low-level


#include <mpi/mpi.h>

typedef int rank_t;

template<typename T>
MPI_Datatype get_mpi_type() { return 0; }
MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }
MPI_Datatype get_mpi_type<int>() { return MPI_INT; }
MPI_Datatype get_mpi_type<short>() { return MPI_SHORT; }
MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }
MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }

static void mpi_error(MPI_Comm *comm, int *stat, ...)
{
	int len;
	char err_string[MPI_MAX_ERROR_STRING];

	printf("MPI error number: %i\n", *stat);
	MPI_Error_string(*stat, err_string, &len);
	printf("%s\n", err_string);
	terminate(*stat);
}

void init_machine(int argc, char **argv)
{
	int i, flag, *tag_ub;
	MPI_Comm comm;
	MPI_Errhandler errhandler;

	flag = MPI_Init(&argc, &argv);
	comm = MPI_COMM_WORLD;
	if (flag) mpi_error(&comm, &flag);
	flag = MPI_Errhandler_create(mpi_error, &errhandler);
	if (flag) mpi_error(&comm, &flag);
	flag = MPI_Errhandler_set(MPI_COMM_WORLD, errhandler);
	if (flag) mpi_error(&comm, &flag);
}

void shutdown_machine()
{
	MPI_Finalize();
}

int get_num_nodes()
{
	int n;
	MPI_Comm_size(MPI_COMM_WORLD, &n);
	return n;
}

rank_t get_node_rank()
{
	rank_t rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
	return rank_t;
}

void synchronize()
{
	MPI_Barrier(MPI_COMM_WORLD);
}

void broadcast(T* buffer, int length)
{
	MPI_Bcast(buffer, length, get_mpi_type<T>(), 0, MPI_COMM_WORLD);
}

template<typename T>
void send(T* buffer, int length, rank_t dest)
{
	MPI_Send(buffer, length, get_mpi_type<T>(), dest, tag, MPI_COMM_WORLD);
}

template<typename T>
void recv(T* buffer, int length, rank_t src)
{
	MPI_Status status;
	MPI_Recv(buffer, length, get_mpi_type<T>(), src, tag, MPI_COMM_WORLD, &status);
}

template<typename T>
T sum(T value)
{
	T work;
	MPI_Allreduce(&value, &work, 1, get_mpi_type<T>(), MPI_SUM, MPI_COMM_WORLD);
	return work;
}

template<typename T>
void sum(T* value, T* out, int length)
{
	MPI_Allreduce(&in, out, get_mpi_type<T>(), length, MPI_SUM,, MPI_COMM_WORLD);
}

Intermediate layer

#include <map>
#include <pair>

typedef local_idx_t int;
typedef global_idx_t int;

class comm_list {
	// Represents a source_index -> dest_index pair
	typedef std::pair<global_idx_t, global_idx_t> comm_entry;

	// Maps a destination rank to a list of comm_entries
	std::map<rank_t, std::vector<comm_entry> > send;

	// Maps a source rank to a list of comm_entries
	std::map<rank_t, std::vector<comm_entry> > recv;

	// All of the intra-process comm_entries
	std::vector<comm_entry> local;

private:
	void add_entry(comm_entry& ce) {
		global_idx_t src=i->first, dest=i->second;

		src_rank = get_site_rank(src);
		dest_rank = get_site_rank(dest)
		if (src_rank == rank && dest_rank == rank)
			local.push_back(*i);
		else if (src_rank == rank) {
			if (send.find(dest_rank) == send.end())
				send[dest_rank] = vector<comm_entry>();
			send[dest_rank].append(*i);
		} else if (dest_rank == rank) {
			if (recv.find(src_rank) == recv.end())
				recv[src_rank] = vector<comm_entry>();
			recv[src_rank].append(*i);
		}
	}

public:
	struct entry_iterator {
		virtual void operator++() = 0;
		virtual comm_entry operator*() = 0;
		virtual bool end() = 0;
	};
	public comm_list(rank_t rank, entry_iterator& iter) {
		for (; !iter.end(); iter++)
			add_entry(*iter);
	}

	comm_list(rank_t rank, const std::vector<comm_entry> l) {
		for (std::vector<comm_entry>::iterator i=l.begin(); i != l.end(); i++)
			add_entry(*i);
	}
};


template<typename T>
struct linked_buffer {
	typedef T slot_type;
	rank_t peer;
	const int length;
	shared_ptr<T> buffer;
	std::vector<T*> pointers;

	linked_buffer(rank_t peer, int length) :
		peer(peer),
		length(length), buffer(new T[length]),
		pointers(length) { }

	void gather()
	{
		for (int i=0; i<length; i++)
			buffer[i] = *pointers[i];
	}

	void scatter()
	{
		for (int i=0; i<length; i++)
			*pointers[i] = buffer[i];
	}
};

// TODO: Need to think about this a little more
template<typename T>
struct linked_buffer_set {
	typedef linked_buffer<T> buffer;
	std::map<rank_t, buffer> send;
	std::map<rank_t, buffer> recv;
	std::vector<buffer> local;

	typedef std::pair<T**, global_idx_t> fixup;
	std::vector<fixup> fixups;

	linked_buffer_set(comm_list comm)
	{
		std::vector<fixup> fixups;
		std::map<rank, buffer> res;

		// Create send buffers
		for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.send.begin();
				process != comm.send.end(); process++) {
			rank_t rank = process->first;
			std::vector<comm_entry> entries = process->second;
			buffer buf(rank, entries.length);
			res[rank] = buf;

			// Create fix-ups
			std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
			for (int i=0; entry != entries.end(); entry++, i++) {
				global_idx_t src = entry->first;
				fixup fixup(&buf.pointers[i], src);
				fixups.push_back(fixup);
			}
		}

		// Create recv buffers
		for (std::map<rank_t, std::vector<comm_entry> >::iterator process=comm.recv.begin();
				process != comm.recv.end(); process++) {
			rank_t rank = process->first;
			std::vector<comm_entry> entries = process->second;
			buffer buf(rank, entries.length);
			res[rank] = buf;

			// Create fix-ups
			std::vector<comm_list::comm_entry>::iterator entry = entries.begin;
			for (int i=0; entry != entries.end(); entry++, i++) {
				global_idx_t dest = entry->second;
				fixup fixup(&buf.pointers[i], dest);
				fixups.push_back(fixup);
			}
		}
	}
};