From 7d64e20ae8f30df64f5fb5b51535f1f8cdcec59f Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 10:51:03 -0700 Subject: [PATCH 1/9] trying parallel file writing w/ write lock --- applications/join/relation_io.hpp | 52 ++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index e342611bf..9c1244322 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -17,7 +17,8 @@ namespace fs = boost::filesystem; #include "relation.hpp" #include "grappa/graph.hpp" - +#include +#include DECLARE_string(relations); DECLARE_bool(bin); @@ -209,6 +210,35 @@ Relation readTuplesUnordered( std::string fn ) { Relation r = { tuples, ntuples }; return r; } +/// helper function to write to a range of a file with POSIX range locking +static void write_locked_range( const char * filename, size_t offset, const char * buf, size_t size ) { + int fd; + + PCHECK( (fd = open( filename, O_WRONLY )) >= 0 ); + + // lock range in file + struct flock lock; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = offset; + lock.l_len = size; + PCHECK( fcntl( fd, F_SETLK, &lock ) >= 0 ) << "Could not obtain record lock on file"; + + // seek to range + int64_t new_offset = 0; + PCHECK( (new_offset = lseek( fd, offset , SEEK_SET )) >= 0 ); + CHECK_EQ( new_offset, offset ); + + // write + PCHECK( write( fd, buf, size ) >= 0 ); + + // release lock on file range + lock.l_type = F_UNLCK; + PCHECK( fcntl( fd, F_SETLK, &lock ) >= 0 ) << "Could not release record lock on file"; + + // close file + PCHECK( close( fd ) >= 0 ); +} // assumes that for object T, the address of T is the address of its fields template @@ -220,26 +250,24 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { char data_path_char[2048]; sprintf(data_path_char, "%s", data_path.c_str()); + size_t offset_counter; + auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); + on_all_cores( [=] { VLOG(5) << "opening addr next"; VLOG(5) << "opening addr " << &data_path_char; VLOG(5) << "opening " << data_path_char; + T dummy; + int64_t row_offset = Grappa::delegate::fetch_and_add( offset_counter_addr, vec->size() ); - std::ofstream data_file(data_path_char, std::ios_base::out | std::ios_base::app | std::ios_base::binary); - CHECK( data_file.is_open() ) << data_path_char << " failed to open"; VLOG(5) << "writing"; - - for (auto it = vec->begin(); it < vec->end(); it++) { - for (int j = 0; j < it->numFields(); j++) { - int64_t val = it->get(j); - data_file.write((char*)&val, sizeof(val)); - } - } - - data_file.close(); + LOG(INFO) << vec->size() * dummy.numFields()* sizeof(int64_t); + write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&vec[0], + vec->size() * dummy.numFields()* sizeof(int64_t)); }); } + void writeSchema(std::string names, std::string types, std::string fn ) { std::string data_path = FLAGS_relations+"/"+fn; From 58c82921c09eebe84d6bf047edbd4af5a5289353 Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 11:31:03 -0700 Subject: [PATCH 2/9] can open file, write correct size, wrong info --- applications/join/relation_io.hpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index 9c1244322..48bf0d69b 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -210,6 +210,7 @@ Relation readTuplesUnordered( std::string fn ) { Relation r = { tuples, ntuples }; return r; } + /// helper function to write to a range of a file with POSIX range locking static void write_locked_range( const char * filename, size_t offset, const char * buf, size_t size ) { int fd; @@ -252,7 +253,15 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { size_t offset_counter; auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); - + + std::ifstream f(data_path_char); + if (f.good()) { + f.close(); + remove(data_path_char); + std::ofstream outfile(data_path_char); + outfile.close(); + } + on_all_cores( [=] { VLOG(5) << "opening addr next"; VLOG(5) << "opening addr " << &data_path_char; @@ -263,7 +272,7 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { VLOG(5) << "writing"; LOG(INFO) << vec->size() * dummy.numFields()* sizeof(int64_t); write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&vec[0], - vec->size() * dummy.numFields()* sizeof(int64_t)); + vec->size() * dummy.numFields()* sizeof(int64_t)); }); } From 85bca6f57557b905a6816a8a6adb6e8aad3c9264 Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 12:42:07 -0700 Subject: [PATCH 3/9] create file to write to, write actual tuple values --- applications/join/relation_io.hpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index 48bf0d69b..b39cf6c6e 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -255,12 +255,12 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); std::ifstream f(data_path_char); - if (f.good()) { + if (f.is_open()) { f.close(); remove(data_path_char); - std::ofstream outfile(data_path_char); - outfile.close(); } + std::ofstream outfile(data_path_char); + outfile.close(); on_all_cores( [=] { VLOG(5) << "opening addr next"; @@ -268,10 +268,18 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { VLOG(5) << "opening " << data_path_char; T dummy; int64_t row_offset = Grappa::delegate::fetch_and_add( offset_counter_addr, vec->size() ); + int64_t tuples[vec->size()]; + int i = 0; + for (auto it = vec->begin(); it != vec->end(); it++) { + for (int j = 0; j < it->numFields(); j++) { + int64_t val = it->get(j); + tuples[i] = val; + i++; + } + } VLOG(5) << "writing"; - LOG(INFO) << vec->size() * dummy.numFields()* sizeof(int64_t); - write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&vec[0], + write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&tuples[0], vec->size() * dummy.numFields()* sizeof(int64_t)); }); } From b310417f638948ba3e489dace0efe3c245add8c3 Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 14:35:33 -0700 Subject: [PATCH 4/9] changing tests since new files are created each time --- applications/join/Relation_io_tests.cpp | 52 +++++++++++-------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/applications/join/Relation_io_tests.cpp b/applications/join/Relation_io_tests.cpp index 119e13079..fc3ad4baa 100644 --- a/applications/join/Relation_io_tests.cpp +++ b/applications/join/Relation_io_tests.cpp @@ -73,7 +73,7 @@ class MaterializedTupleRef_V1_0_1 { } int64_t other_data __attribute__ ((aligned (2048))) = 0; -std::vector more_data; +std::vector data; BOOST_AUTO_TEST_CASE( test1 ) { Grappa::init( GRAPPA_TEST_ARGS ); @@ -83,50 +83,44 @@ BOOST_AUTO_TEST_CASE( test1 ) { // write to new file std::string write_file = "write.bin"; - MaterializedTupleRef_V1_0_1 one; - MaterializedTupleRef_V1_0_1 two; - one.set(0, 10); - one.set(1, 11); - two.set(0, 12); - two.set(1, 13); - more_data.push_back(one); - more_data.push_back(two); - - writeTuplesUnordered( &more_data, write_file ); + for (int i = 0; i < 10; i++) { + MaterializedTupleRef_V1_0_1 a; + a.set(0, i); + a.set(1, i+1); + data.push_back(a); + } + writeTuplesUnordered( &data, write_file ); // try read Relation results = readTuplesUnordered( write_file ); - BOOST_CHECK_EQUAL( 2, results.numtuples ); + BOOST_CHECK_EQUAL( 10, results.numtuples ); MaterializedTupleRef_V1_0_1 expected; - expected.set(0, 10); - expected.set(1, 11); + expected.set(0, 0); + expected.set(1, 1); BOOST_CHECK_EQUAL( expected.get(0), (*results.data.localize()).get(0) ); BOOST_CHECK_EQUAL( expected.get(1), (*results.data.localize()).get(1) ); - // write to existing file - MaterializedTupleRef_V1_0_1 three; - MaterializedTupleRef_V1_0_1 four; - three.set(0, 14); - three.set(1, 15); - four.set(0, 16); - four.set(1, 17); - more_data.clear(); - more_data.push_back(three); - more_data.push_back(four); - - writeTuplesUnordered( &more_data, write_file ); + // write to existing file, should replace + data.clear(); + for (int i = 0; i < 30; i++) { + MaterializedTupleRef_V1_0_1 a; + a.set(0, i); + a.set(1, i); + data.push_back(a); + } + writeTuplesUnordered( &data, write_file ); // verify write results = readTuplesUnordered( write_file ); - BOOST_CHECK_EQUAL( 4, results.numtuples ); + BOOST_CHECK_EQUAL( 30, results.numtuples ); - expected.set(0, 10); - expected.set(1, 11); + expected.set(0, 0); + expected.set(1, 0); BOOST_CHECK_EQUAL( expected.get(0), (*results.data.localize()).get(0) ); BOOST_CHECK_EQUAL( expected.get(1), (*results.data.localize()).get(1) ); From 2704d769d01e3b5801c1786e1a4b7036947e5813 Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 14:37:58 -0700 Subject: [PATCH 5/9] write the tuples correctly in parallel, vector values to array first --- applications/join/relation_io.hpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index b39cf6c6e..b26e62d0a 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -254,11 +254,13 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { size_t offset_counter; auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); + // removes the file if it already exists std::ifstream f(data_path_char); if (f.is_open()) { f.close(); remove(data_path_char); } + // write_locked_range works only when file already existsp std::ofstream outfile(data_path_char); outfile.close(); @@ -268,7 +270,9 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { VLOG(5) << "opening " << data_path_char; T dummy; int64_t row_offset = Grappa::delegate::fetch_and_add( offset_counter_addr, vec->size() ); - int64_t tuples[vec->size()]; + + // transform vector tuples into actual values of size 64 + int64_t tuples[vec->size() * dummy.numFields()]; int i = 0; for (auto it = vec->begin(); it != vec->end(); it++) { for (int j = 0; j < it->numFields(); j++) { @@ -280,7 +284,7 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { VLOG(5) << "writing"; write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&tuples[0], - vec->size() * dummy.numFields()* sizeof(int64_t)); + vec->size() * dummy.numFields() * sizeof(int64_t)); }); } From 305f05b4c6c3c34b4b0bf93ec3452552dc6fb9c4 Mon Sep 17 00:00:00 2001 From: York Wei Date: Fri, 29 Aug 2014 17:28:19 -0700 Subject: [PATCH 6/9] fix lock grab --- applications/join/relation_io.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index b26e62d0a..b6acef2d2 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -223,7 +223,9 @@ static void write_locked_range( const char * filename, size_t offset, const char lock.l_whence = SEEK_SET; lock.l_start = offset; lock.l_len = size; - PCHECK( fcntl( fd, F_SETLK, &lock ) >= 0 ) << "Could not obtain record lock on file"; + while ( fcntl( fd, F_SETLK, &lock ) < 0 ) { + usleep(2); + } // seek to range int64_t new_offset = 0; @@ -291,7 +293,6 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { void writeSchema(std::string names, std::string types, std::string fn ) { std::string data_path = FLAGS_relations+"/"+fn; - CHECK( data_path.size() <= 2040 ); char data_path_char[2048]; sprintf(data_path_char, "%s", data_path.c_str()); From 873f8e378fe319e0f00f67fdb859a950298c7d11 Mon Sep 17 00:00:00 2001 From: York Wei Date: Tue, 2 Sep 2014 11:12:32 -0700 Subject: [PATCH 7/9] comments, small fixes --- applications/join/Relation_io_tests.cpp | 8 ++++---- applications/join/relation_io.hpp | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/applications/join/Relation_io_tests.cpp b/applications/join/Relation_io_tests.cpp index fc3ad4baa..53e29ed04 100644 --- a/applications/join/Relation_io_tests.cpp +++ b/applications/join/Relation_io_tests.cpp @@ -67,10 +67,10 @@ class MaterializedTupleRef_V1_0_1 { return o; } - } GRAPPA_BLOCK_ALIGNED; - std::ostream& operator<< (std::ostream& o, const MaterializedTupleRef_V1_0_1& t) { - return t.dump(o); - } +} GRAPPA_BLOCK_ALIGNED; +std::ostream& operator<< (std::ostream& o, const MaterializedTupleRef_V1_0_1& t) { + return t.dump(o); +} int64_t other_data __attribute__ ((aligned (2048))) = 0; std::vector data; diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index b6acef2d2..39b98acd9 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -262,7 +262,8 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { f.close(); remove(data_path_char); } - // write_locked_range works only when file already existsp + + // write_locked_range works only when file already exists std::ofstream outfile(data_path_char); outfile.close(); @@ -285,12 +286,12 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { } VLOG(5) << "writing"; - write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), (char*)&tuples[0], - vec->size() * dummy.numFields() * sizeof(int64_t)); - }); + write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), + (char*)&tuples[0], vec->size() * dummy.numFields() * sizeof(int64_t)); + }); } - +// writes names and types to filename fn in ASCII separated by a newline void writeSchema(std::string names, std::string types, std::string fn ) { std::string data_path = FLAGS_relations+"/"+fn; CHECK( data_path.size() <= 2040 ); @@ -316,7 +317,7 @@ template< typename N=int64_t, typename Parser=decltype(toInt) > void convert2bin( std::string fn, Parser parser=&toInt, char * separators=" ", uint64_t burn=0 ) { std::ifstream infile(fn, std::ifstream::in); CHECK( infile.is_open() ) << fn << " failed to open"; - + std::string outpath = fn+".bin"; std::ofstream outfile(outpath, std::ios_base::out | std::ios_base::binary ); CHECK( outfile.is_open() ) << outpath << " failed to open"; From 453ec129d024cfdbe0edc70bc724c0f472a6ba80 Mon Sep 17 00:00:00 2001 From: York Wei Date: Wed, 3 Sep 2014 12:29:22 -0700 Subject: [PATCH 8/9] fixing tuples for writing --- applications/join/relation_io.hpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index 39b98acd9..7e22592df 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -223,9 +223,10 @@ static void write_locked_range( const char * filename, size_t offset, const char lock.l_whence = SEEK_SET; lock.l_start = offset; lock.l_len = size; - while ( fcntl( fd, F_SETLK, &lock ) < 0 ) { - usleep(2); - } + + VLOG(4) << "acquiring lock"; + PCHECK( fcntl( fd, F_SETLKW, &lock ) >=0 ) << "start " << offset << " end " << offset + size; + VLOG(4) << "acquiried lock"; // seek to range int64_t new_offset = 0; @@ -233,12 +234,12 @@ static void write_locked_range( const char * filename, size_t offset, const char CHECK_EQ( new_offset, offset ); // write - PCHECK( write( fd, buf, size ) >= 0 ); + PCHECK( write( fd, buf, size ) >= 0 ) << "addr " << ((void*)buf); // release lock on file range lock.l_type = F_UNLCK; PCHECK( fcntl( fd, F_SETLK, &lock ) >= 0 ) << "Could not release record lock on file"; - + VLOG(4) << "unlock"; // close file PCHECK( close( fd ) >= 0 ); } @@ -275,7 +276,8 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { int64_t row_offset = Grappa::delegate::fetch_and_add( offset_counter_addr, vec->size() ); // transform vector tuples into actual values of size 64 - int64_t tuples[vec->size() * dummy.numFields()]; + int64_t * tuples = new int64_t[vec->size() * dummy.numFields()]; + int i = 0; for (auto it = vec->begin(); it != vec->end(); it++) { for (int j = 0; j < it->numFields(); j++) { @@ -286,8 +288,10 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { } VLOG(5) << "writing"; + VLOG(4) << "tuples addr " << &tuples[0]; write_locked_range(data_path_char, row_offset * sizeof(int64_t) * dummy.numFields(), - (char*)&tuples[0], vec->size() * dummy.numFields() * sizeof(int64_t)); + (char*)tuples, vec->size() * dummy.numFields() * sizeof(int64_t)); + delete tuples; }); } From aadad2f02ee312fb69922b12bfdf6eb0ea4d2146 Mon Sep 17 00:00:00 2001 From: Brandon Myers Date: Mon, 29 Sep 2014 00:52:21 -0700 Subject: [PATCH 9/9] fix non initialized variable --- applications/join/relation_io.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/applications/join/relation_io.hpp b/applications/join/relation_io.hpp index 7e22592df..ec1a479a0 100644 --- a/applications/join/relation_io.hpp +++ b/applications/join/relation_io.hpp @@ -150,7 +150,7 @@ size_t readTuplesUnordered( std::string fn, GlobalAddress * buf_addr, int64_t auto tuples = Grappa::global_alloc(ntuples); - size_t offset_counter; + size_t offset_counter = 0; auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); // we will broadcast the file name as bytes @@ -254,7 +254,7 @@ void writeTuplesUnordered(std::vector * vec, std::string fn ) { char data_path_char[2048]; sprintf(data_path_char, "%s", data_path.c_str()); - size_t offset_counter; + size_t offset_counter = 0; auto offset_counter_addr = make_global( &offset_counter, Grappa::mycore() ); // removes the file if it already exists