gprofng: fix data race
authorVladimir Mezentsev <vladimir.mezentsev@oracle.com>
Thu, 29 Jun 2023 20:11:09 +0000 (13:11 -0700)
committerVladimir Mezentsev <vladimir.mezentsev@oracle.com>
Sat, 1 Jul 2023 15:33:11 +0000 (08:33 -0700)
In our GUI project (https://savannah.gnu.org/projects/gprofng-gui), we use
the output of gprofng to display the data. Sometimes this data is corrupted.

gprofng/ChangeLog
2023-06-29  Vladimir Mezentsev  <vladimir.mezentsev@oracle.com>

* src/ipc.cc (ipc_doWork): Fix data race.
* src/ipcio.cc (IPCresponse::print): Fix data race.
Remove unused variables and functions.
* src/ipcio.h: Declare two variables.
* src/StringBuilder.cc (StringBuilder::write): New function.
* src/StringBuilder.h: Likewise.

gprofng/src/StringBuilder.cc
gprofng/src/StringBuilder.h
gprofng/src/ipc.cc
gprofng/src/ipcio.cc
gprofng/src/ipcio.h

index a806261d02699a353a21af292f0d2ae7e70f6b5e..f312866bd23436977a783403f2b959f548ebabaf 100644 (file)
@@ -24,6 +24,7 @@
 #include <string.h>
 #include <values.h>
 #include <stdarg.h>
+#include <unistd.h>
 
 #include "gp-defs.h"
 #include "StringBuilder.h"
@@ -447,6 +448,13 @@ StringBuilder::toFileLn (FILE *fp)
   fprintf (fp, NTXT ("%s\n"), value);
 }
 
+void
+StringBuilder::write (int fd)
+{
+  if (count > 0)
+    ::write (fd, value, count);
+}
+
 StringBuilder *
 StringBuilder::sprintf (const char *fmt, ...)
 {
index cb7127bc1371af0961e38e4e0d91c6228a45af54..8db90c51239722f0f47263f442e474bc55819566 100644 (file)
@@ -82,6 +82,7 @@ public:
   char *toString ();
   void toFile (FILE *fp);
   void toFileLn (FILE *fp);
+  void write (int fd);
 
   // Not in Java
   StringBuilder *appendf (const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
index 3cf6b8f0eccb6fb0bbb353db598a48658f72b734..d0f15d3813fe87bccf645b7b469f37b93aa7dee9 100644 (file)
@@ -189,11 +189,10 @@ sigterm_handler (int, siginfo_t *, void *)
 static const char *ipc_log_name = NULL;
 static const char *ipc_request_log_name = NULL;
 static const char *ipc_response_log_name = NULL;
-FILE *requestLogFileP = stderr;
-FILE *responseLogFileP = stderr;
-hrtime_t begin_time;
-long long delta_time = 0;
-int ipc_delay_microsec = 0;
+static FILE *requestLogFileP = stderr;
+static FILE *responseLogFileP = stderr;
+static hrtime_t begin_time;
+static long long delta_time = 0;
 
 void
 ipc_default_log (const char *fmt, ...)
@@ -362,7 +361,7 @@ ipc_doWork (void *arg)
       ipc_log ("NULL ipc command received, exiting\n");
       return 0;
     }
-  ipc_log ("ipc: %s Req %x Ch %x\n", inp, currentRequestID, currentChannelID);
+  ipc_log ("ipc: %s Req %x Ch %x\n", inp, req->getRequestID (), req->getChannelID ());
   checkCancellableOp (inp, req);
   if (!strcmp (inp, "initApplication"))
     {
@@ -788,7 +787,7 @@ ipc_doWork (void *arg)
       int dbevindex = readInt (req);
       int cmp_mode = readInt (req);
       getView (dbevindex)->set_compare_mode (cmp_mode);
-      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
     }
   else if (!strcmp (inp, "getCompareModeV2"))
     {
@@ -811,7 +810,7 @@ ipc_doWork (void *arg)
       int cmp_mode = readInt (req);
       MetricList *mlist = readMetricListV2 (dbevindex, req);
       getView (dbevindex)->reset_metric_list (mlist, cmp_mode);
-      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
     }
   else if (!strcmp (inp, "getCurMetricsV2"))
     {
@@ -2429,7 +2428,7 @@ ipc_doWork (void *arg)
       dbe_archive (ids, locations);
       delete ids;
       destroy (locations);
-      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
     }
   else if (strcmp (inp, "dbeSetLocations") == 0)
     {
@@ -2438,7 +2437,7 @@ ipc_doWork (void *arg)
       dbeSetLocations (fnames, locations);
       destroy (fnames);
       destroy (locations);
-      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+      writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
     }
   else if (strcmp (inp, "dbeResolvedWith_setpath") == 0)
     {
index 9a6b7afc22ddf643ca00971b3be51a20df63afcc..54648cdcbcc17544fa73effcd7254653161f1e03 100644 (file)
@@ -52,13 +52,6 @@ static const int L_CHAR     = 8;
 
 int currentRequestID;
 int currentChannelID;
-static long maxSize;
-
-extern int cancellableChannelID;
-extern int error_flag;
-extern int ipc_delay_microsec;
-extern FILE *responseLogFileP;
-
 IPCresponse *IPCresponseGlobal;
 
 BufferPool *responseBufferPool;
@@ -624,52 +617,6 @@ IPCresponse::sendAVal (void *ptr)
     }
 }
 
-static void
-writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
-{
-  if (responseType == RESPONSE_TYPE_HANDSHAKE)
-    nBytes = IPC_VERSION_NUMBER;
-  int use_write = 2;
-  ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
-  if (use_write)
-    {
-      char buf[23];
-      if (use_write == 1)
-       {
-         int i = 0;
-         snprintf (buf + i, 3, "%2x", HEADER_MARKER);
-         i += 2;
-         snprintf (buf + i, 9, "%8x", requestID);
-         i += 8;
-         snprintf (buf + i, 3, "%2x", responseType);
-         i += 2;
-         snprintf (buf + i, 3, "%2x", responseStatus);
-         i += 2;
-         snprintf (buf + i, 9, "%8x", nBytes);
-       }
-      else
-       snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
-                 responseType, responseStatus, nBytes);
-      buf[22] = 0;
-      write (1, buf, 22);
-    }
-  else
-    {
-      cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
-      cout << setfill ('0') << setw (8) << hex << requestID;
-      cout << setfill ('0') << setw (2) << hex << responseType;
-      cout << setfill ('0') << setw (2) << hex << responseStatus;
-      cout << setfill ('0') << setw (8) << hex << nBytes;
-      cout.flush ();
-    }
-  ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
-  if (nBytes > maxSize)
-    {
-      maxSize = nBytes;
-      ipc_trace ("New maxsize %ld\n", maxSize);
-    }
-}
-
 bool
 cancelNeeded (int chID)
 {
@@ -698,12 +645,6 @@ writeResponseWithHeader (int requestID, int channelID, int responseType,
   responseBufferPool->recycle (os);
 }
 
-void
-writeAckFast (int requestID)
-{
-  writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
-}
-
 void
 writeAck (int requestID, int channelID)
 {
@@ -731,7 +672,6 @@ writeHandshake (int requestID, int channelID)
 {
   IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
   writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
-  // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
 }
 
 void
@@ -923,30 +863,23 @@ setProgress (int percentage, const char *proc_str)
   return 0;
 }
 
+static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER;
+
 void
 IPCresponse::print (void)
 {
-  if (ipc_delay_microsec)
-    usleep (ipc_delay_microsec);
-  int stringSize = sb->length ();
-  writeResponseHeader (requestID, responseType, responseStatus, stringSize);
-  if (stringSize > 0)
-    {
-      char *s = sb->toString ();
-      hrtime_t start_time = gethrtime ();
-      int use_write = 1;
-      if (use_write)
-       write (1, s, stringSize); // write(1, sb->toString(), stringSize);
-      else
-       {
-         cout << s;
-         cout.flush ();
-       }
-      hrtime_t end_time = gethrtime ();
-      unsigned long long time_stamp = end_time - start_time;
-      ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu  nanosec \n", requestID, time_stamp);
-      free (s);
-    }
+  char buf[23];
+  int sz = responseType == RESPONSE_TYPE_HANDSHAKE ?
+      IPC_VERSION_NUMBER : sb->length ();
+  snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER,
+           requestID, responseType, responseStatus, sz);
+  pthread_mutex_lock (&responce_lock);
+  ipc_response_trace (TRACE_LVL_1,
+                     "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
+                     requestID, responseType, responseStatus, sz);
+  write (1, buf, 22);
+  sb->write (1);
+  pthread_mutex_unlock (&responce_lock);
 }
 
 void
@@ -974,9 +907,7 @@ readRequestHeader ()
   if (requestType == REQUEST_TYPE_HANDSHAKE)
     {
       // write the ack directly to the wire, not through the response queue
-      // writeAckFast(requestID);
       writeAck (requestID, channelID);
-      maxSize = 0;
       writeHandshake (requestID, channelID);
       ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
     }
index 05ff30ba34bfc0e8932674cb0323fa05b1a0aa4a..6c97dc7ee0c5fe12f9fa888b77b4befbe0719fb5 100644 (file)
@@ -168,6 +168,8 @@ extern int ipc_single_threaded_mode;
 extern DbeThreadPool *responseThreadPool;
 extern DbeThreadPool *ipcThreadPool;
 extern int cancelRequestedChannelID;
+extern int cancellableChannelID;
+extern int error_flag;
 
 void ipc_default_log (const char *fmt, ...) __attribute__ ((format (printf, 1, 2)));
 void ipc_response_log (IPCTraceLevel, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));