@@ -73,6 +73,7 @@ class PipeWriter : public ErrorLogger {
7373 explicit PipeWriter (int pipe) : mWpipe(pipe) {}
7474
7575 void reportOut (const std::string &outmsg, Color c) override {
76+ // TODO: do not unconditionally apply colors
7677 writeToPipe (REPORT_OUT, ::toString (c) + outmsg + ::toString (Color::Reset));
7778 }
7879
@@ -85,17 +86,29 @@ class PipeWriter : public ErrorLogger {
8586 }
8687
8788private:
89+ // TODO: how to log file name in error?
8890 void writeToPipe (PipeSignal type, const std::string &data) const
8991 {
9092 unsigned int len = static_cast <unsigned int >(data.length () + 1 );
9193 char *out = new char [len + 1 + sizeof (len)];
9294 out[0 ] = static_cast <char >(type);
9395 std::memcpy (&(out[1 ]), &len, sizeof (len));
9496 std::memcpy (&(out[1 +sizeof (len)]), data.c_str (), len);
95- if (write (mWpipe , out, len + 1 + sizeof (len)) <= 0 ) {
97+
98+ std::size_t bytes_to_write = len + 1 + sizeof (len);
99+ ssize_t bytes_written = write (mWpipe , out, len + 1 + sizeof (len));
100+ if (bytes_written <= 0 ) {
101+ const int err = errno;
102+ delete[] out;
103+ out = nullptr ;
104+ std::cerr << " #### ThreadExecutor::writeToPipe() error for type " << type << " : " << std::strerror (err) << std::endl;
105+ std::exit (EXIT_FAILURE);
106+ }
107+ // TODO: write until everything is written
108+ if (bytes_written != bytes_to_write) {
96109 delete[] out;
97110 out = nullptr ;
98- std::cerr << " #### ThreadExecutor::writeToPipe, Failed to write to pipe " << std::endl;
111+ std::cerr << " #### ThreadExecutor::writeToPipe() error for type " << type << " : insufficient data written (expected: " << bytes_to_write << " / got: " << bytes_written << " ) " << std::endl;
99112 std::exit (EXIT_FAILURE);
100113 }
101114
@@ -105,63 +118,85 @@ class PipeWriter : public ErrorLogger {
105118 const int mWpipe ;
106119};
107120
108- int ProcessExecutor::handleRead (int rpipe, unsigned int &result)
121+ bool ProcessExecutor::handleRead (int rpipe, unsigned int &result, const std::string& filename )
109122{
123+ std::size_t bytes_to_read;
124+ ssize_t bytes_read;
125+
110126 char type = 0 ;
111- if (read (rpipe, &type, 1 ) <= 0 ) {
127+ bytes_to_read = sizeof (char );
128+ bytes_read = read (rpipe, &type, bytes_to_read);
129+ if (bytes_read <= 0 ) {
112130 if (errno == EAGAIN)
113- return 0 ;
131+ return true ;
132+
133+ // TODO: log details about failure
114134
115135 // need to increment so a missing pipe (i.e. premature exit of forked process) results in an error exitcode
116136 ++result;
117- return -1 ;
137+ return false ;
138+ }
139+ if (bytes_read != bytes_to_read) {
140+ std::cerr << " #### ThreadExecutor::handleRead(" << filename << " ) error (type): insufficient data read (expected: " << bytes_to_read << " / got: " << bytes_read << " )" << std::endl;
141+ std::exit (EXIT_FAILURE);
118142 }
119143
120144 if (type != PipeWriter::REPORT_OUT && type != PipeWriter::REPORT_ERROR && type != PipeWriter::CHILD_END) {
121- std::cerr << " #### ThreadExecutor::handleRead error, type was: " << type << std::endl;
145+ std::cerr << " #### ThreadExecutor::handleRead( " << filename << " ) invalid type " << int ( type) << std::endl;
122146 std::exit (EXIT_FAILURE);
123147 }
124148
125149 unsigned int len = 0 ;
126- if (read (rpipe, &len, sizeof (len)) <= 0 ) {
127- std::cerr << " #### ThreadExecutor::handleRead error, type was:" << type << std::endl;
150+ bytes_to_read = sizeof (len);
151+ bytes_read = read (rpipe, &len, bytes_to_read);
152+ if (bytes_read <= 0 ) {
153+ const int err = errno;
154+ std::cerr << " #### ThreadExecutor::handleRead(" << filename << " ) error (len) for type " << int (type) << " : " << std::strerror (err) << std::endl;
155+ std::exit (EXIT_FAILURE);
156+ }
157+ if (bytes_read != bytes_to_read) {
158+ std::cerr << " #### ThreadExecutor::handleRead(" << filename << " ) error (len) for type" << int (type) << " : insufficient data read (expected: " << bytes_to_read << " / got: " << bytes_read << " )" << std::endl;
128159 std::exit (EXIT_FAILURE);
129160 }
130161
131162 // Don't rely on incoming data being null-terminated.
132163 // Allocate +1 element and null-terminate the buffer.
133164 char *buf = new char [len + 1 ];
134- const ssize_t readIntoBuf = read (rpipe, buf, len);
135- if (readIntoBuf <= 0 ) {
136- std::cerr << " #### ThreadExecutor::handleRead error, type was:" << type << std::endl;
137- std::exit (EXIT_FAILURE);
138- }
139- buf[readIntoBuf] = 0 ;
165+ char *data_start = buf;
166+ bytes_to_read = len;
167+ do {
168+ bytes_read = read (rpipe, data_start, bytes_to_read);
169+ if (bytes_read <= 0 ) {
170+ const int err = errno;
171+ std::cerr << " #### ThreadExecutor::handleRead(" << filename << " ) error (buf) for type" << int (type) << " : " << std::strerror (err) << std::endl;
172+ std::exit (EXIT_FAILURE);
173+ }
174+ bytes_to_read -= bytes_read;
175+ data_start += bytes_read;
176+ } while (bytes_to_read != 0 );
177+ buf[len] = 0 ;
140178
179+ bool res = true ;
141180 if (type == PipeWriter::REPORT_OUT) {
142181 mErrorLogger .reportOut (buf);
143182 } else if (type == PipeWriter::REPORT_ERROR) {
144183 ErrorMessage msg;
145184 try {
146185 msg.deserialize (buf);
147186 } catch (const InternalError& e) {
148- std::cerr << " #### ThreadExecutor::handleRead error, internal error:" << e.errorMessage << std::endl;
187+ std::cerr << " #### ThreadExecutor::handleRead( " << filename << " ) internal error: " << e.errorMessage << std::endl;
149188 std::exit (EXIT_FAILURE);
150189 }
151190
152191 if (hasToLog (msg))
153192 mErrorLogger .reportErr (msg);
154193 } else if (type == PipeWriter::CHILD_END) {
155- std::istringstream iss (buf);
156- unsigned int fileResult = 0 ;
157- iss >> fileResult;
158- result += fileResult;
159- delete[] buf;
160- return -1 ;
194+ result += std::stoi (buf);
195+ res = false ;
161196 }
162197
163198 delete[] buf;
164- return 1 ;
199+ return res ;
165200}
166201
167202bool ProcessExecutor::checkLoadAverage (size_t nchildren)
@@ -274,12 +309,15 @@ unsigned int ProcessExecutor::check()
274309 std::list<int >::iterator rp = rpipes.begin ();
275310 while (rp != rpipes.end ()) {
276311 if (FD_ISSET (*rp, &rfds)) {
277- const int readRes = handleRead (*rp, result);
278- if (readRes == -1 ) {
312+ std::string name;
313+ const std::map<int , std::string>::iterator p = pipeFile.find (*rp);
314+ if (p != pipeFile.end ()) {
315+ name = p->second ;
316+ }
317+ const bool readRes = handleRead (*rp, result, name);
318+ if (!readRes) {
279319 std::size_t size = 0 ;
280- const std::map<int , std::string>::iterator p = pipeFile.find (*rp);
281320 if (p != pipeFile.end ()) {
282- std::string name = p->second ;
283321 pipeFile.erase (p);
284322 const std::map<std::string, std::size_t >::const_iterator fs = mFiles .find (name);
285323 if (fs != mFiles .end ()) {
0 commit comments