Skip to content
Open
60 changes: 27 additions & 33 deletions applications/join/Relation_io_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ class MaterializedTupleRef_V1_0_1 {
return o;
}

} GRAPPA_BLOCK_ALIGNED;
std::ostream& operator<< (std::ostream& o, const MaterializedTupleRef_V1_0_1& t) {
return t.dump(o);
}
} GRAPPA_BLOCK_ALIGNED;
std::ostream& operator<< (std::ostream& o, const MaterializedTupleRef_V1_0_1& t) {
return t.dump(o);
}

int64_t other_data __attribute__ ((aligned (2048))) = 0;
std::vector<MaterializedTupleRef_V1_0_1> more_data;
std::vector<MaterializedTupleRef_V1_0_1> data;

BOOST_AUTO_TEST_CASE( test1 ) {
Grappa::init( GRAPPA_TEST_ARGS );
Expand All @@ -83,50 +83,44 @@ BOOST_AUTO_TEST_CASE( test1 ) {

// write to new file
std::string write_file = "write.bin";
MaterializedTupleRef_V1_0_1 one;
MaterializedTupleRef_V1_0_1 two;
one.set(0, 10);
one.set(1, 11);
two.set(0, 12);
two.set(1, 13);
more_data.push_back(one);
more_data.push_back(two);

writeTuplesUnordered<MaterializedTupleRef_V1_0_1>( &more_data, write_file );
for (int i = 0; i < 10; i++) {
MaterializedTupleRef_V1_0_1 a;
a.set(0, i);
a.set(1, i+1);
data.push_back(a);
}
writeTuplesUnordered<MaterializedTupleRef_V1_0_1>( &data, write_file );

// try read
Relation<MaterializedTupleRef_V1_0_1> results =
readTuplesUnordered<MaterializedTupleRef_V1_0_1>( write_file );

BOOST_CHECK_EQUAL( 2, results.numtuples );
BOOST_CHECK_EQUAL( 10, results.numtuples );
MaterializedTupleRef_V1_0_1 expected;
expected.set(0, 10);
expected.set(1, 11);
expected.set(0, 0);
expected.set(1, 1);
BOOST_CHECK_EQUAL( expected.get(0), (*results.data.localize()).get(0) );
BOOST_CHECK_EQUAL( expected.get(1), (*results.data.localize()).get(1) );


// write to existing file
MaterializedTupleRef_V1_0_1 three;
MaterializedTupleRef_V1_0_1 four;
three.set(0, 14);
three.set(1, 15);
four.set(0, 16);
four.set(1, 17);
more_data.clear();
more_data.push_back(three);
more_data.push_back(four);

writeTuplesUnordered<MaterializedTupleRef_V1_0_1>( &more_data, write_file );
// write to existing file, should replace
data.clear();
for (int i = 0; i < 30; i++) {
MaterializedTupleRef_V1_0_1 a;
a.set(0, i);
a.set(1, i);
data.push_back(a);
}
writeTuplesUnordered<MaterializedTupleRef_V1_0_1>( &data, write_file );

// verify write
results =
readTuplesUnordered<MaterializedTupleRef_V1_0_1>( write_file );

BOOST_CHECK_EQUAL( 4, results.numtuples );
BOOST_CHECK_EQUAL( 30, results.numtuples );

expected.set(0, 10);
expected.set(1, 11);
expected.set(0, 0);
expected.set(1, 0);
BOOST_CHECK_EQUAL( expected.get(0), (*results.data.localize()).get(0) );
BOOST_CHECK_EQUAL( expected.get(1), (*results.data.localize()).get(1) );

Expand Down
77 changes: 66 additions & 11 deletions applications/join/relation_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace fs = boost::filesystem;
#include "relation.hpp"

#include "grappa/graph.hpp"

#include <unistd.h>
#include <cstdio>
DECLARE_string(relations);
DECLARE_bool(bin);

Expand Down Expand Up @@ -149,7 +150,7 @@ size_t readTuplesUnordered( std::string fn, GlobalAddress<T> * buf_addr, int64_t

auto tuples = Grappa::global_alloc<T>(ntuples);

size_t offset_counter;
size_t offset_counter = 0;
auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() );

// we will broadcast the file name as bytes
Expand Down Expand Up @@ -210,6 +211,39 @@ Relation<T> readTuplesUnordered( std::string fn ) {
return r;
}

/// helper function to write to a range of a file with POSIX range locking
static void write_locked_range( const char * filename, size_t offset, const char * buf, size_t size ) {
int fd;

PCHECK( (fd = open( filename, O_WRONLY )) >= 0 );

// lock range in file
struct flock lock;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
lock.l_start = offset;
lock.l_len = size;

VLOG(4) << "acquiring lock";
PCHECK( fcntl( fd, F_SETLKW, &lock ) >=0 ) << "start " << offset << " end " << offset + size;
VLOG(4) << "acquiried lock";

// seek to range
int64_t new_offset = 0;
PCHECK( (new_offset = lseek( fd, offset , SEEK_SET )) >= 0 );
CHECK_EQ( new_offset, offset );

// write
PCHECK( write( fd, buf, size ) >= 0 ) << "addr " << ((void*)buf);

// release lock on file range
lock.l_type = F_UNLCK;
PCHECK( fcntl( fd, F_SETLK, &lock ) >= 0 ) << "Could not release record lock on file";
VLOG(4) << "unlock";
// close file
PCHECK( close( fd ) >= 0 );
}

// assumes that for object T, the address of T is the address of its fields
template <typename T>
void writeTuplesUnordered(std::vector<T> * vec, std::string fn ) {
Expand All @@ -220,29 +254,50 @@ void writeTuplesUnordered(std::vector<T> * vec, std::string fn ) {
char data_path_char[2048];
sprintf(data_path_char, "%s", data_path.c_str());

size_t offset_counter = 0;
auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() );

// removes the file if it already exists
std::ifstream f(data_path_char);
if (f.is_open()) {
f.close();
remove(data_path_char);
}

// write_locked_range works only when file already exists
std::ofstream outfile(data_path_char);
outfile.close();

on_all_cores( [=] {
VLOG(5) << "opening addr next";
VLOG(5) << "opening addr " << &data_path_char;
VLOG(5) << "opening " << data_path_char;
T dummy;
int64_t row_offset = Grappa::delegate::fetch_and_add( offset_counter_addr, vec->size() );

std::ofstream data_file(data_path_char, std::ios_base::out | std::ios_base::app | std::ios_base::binary);
CHECK( data_file.is_open() ) << data_path_char << " failed to open";
VLOG(5) << "writing";
// transform vector tuples into actual values of size 64
int64_t * tuples = new int64_t[vec->size() * dummy.numFields()];

for (auto it = vec->begin(); it < vec->end(); it++) {
int i = 0;
for (auto it = vec->begin(); it != vec->end(); it++) {
for (int j = 0; j < it->numFields(); j++) {
int64_t val = it->get(j);
data_file.write((char*)&val, sizeof(val));
tuples[i] = val;
i++;
}
}

data_file.close();
});
VLOG(5) << "writing";
VLOG(4) << "tuples addr " << &tuples[0];
write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(),
(char*)tuples, vec->size() * dummy.numFields() * sizeof(int64_t));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: instead of write_locked_range writing the entire array at once, use MaterializedTuple.toOstream. Get an ostream from the fd

delete tuples;
});
}

// writes names and types to filename fn in ASCII separated by a newline
void writeSchema(std::string names, std::string types, std::string fn ) {
std::string data_path = FLAGS_relations+"/"+fn;

CHECK( data_path.size() <= 2040 );
char data_path_char[2048];
sprintf(data_path_char, "%s", data_path.c_str());
Expand All @@ -266,7 +321,7 @@ template< typename N=int64_t, typename Parser=decltype(toInt) >
void convert2bin( std::string fn, Parser parser=&toInt, char * separators=" ", uint64_t burn=0 ) {
std::ifstream infile(fn, std::ifstream::in);
CHECK( infile.is_open() ) << fn << " failed to open";

std::string outpath = fn+".bin";
std::ofstream outfile(outpath, std::ios_base::out | std::ios_base::binary );
CHECK( outfile.is_open() ) << outpath << " failed to open";
Expand Down