diff main/database/src/pq_lo.cc @ 11422:b1f26b72b61a octave-forge

Large object import/export also from/to pipes.
author i7tiol
date Tue, 29 Jan 2013 18:20:21 +0000
parents 9aee227e296c
children 35e9e4b6ab34
line wrap: on
line diff
--- a/main/database/src/pq_lo.cc	Tue Jan 29 10:11:46 2013 +0000
+++ b/main/database/src/pq_lo.cc	Tue Jan 29 18:20:21 2013 +0000
@@ -21,6 +21,8 @@
 #include <octave/ov-struct.h>
 #include <octave/Cell.h>
 
+#include <stdio.h>
+
 #include "command.h"
 #include <postgresql/libpq/libpq-fs.h>
 
@@ -28,11 +30,289 @@
 // PKG_ADD: autoload ("pq_lo_export", "pq_interface.oct");
 // PKG_ADD: autoload ("pq_lo_unlink", "pq_interface.oct");
 
+#define OCT_PQ_BUFSIZE 1024
+
+// For cleanup handling this is a class.
+class pipe_to_lo
+{
+public:
+
+  pipe_to_lo (octave_pq_connection &, const char *, bool, std::string &);
+
+  ~pipe_to_lo (void);
+
+  bool valid (void) { return oid_valid; }
+
+  Oid get_oid (void) { return oid; }
+
+  std::string &msg;
+
+private:
+
+  Oid oid;
+
+  FILE *fp;
+
+  bool oid_valid;
+
+  int lod;
+
+  bool commit;
+
+  octave_pq_connection &oct_pq_conn;
+
+  PGconn *conn;
+};
+
+pipe_to_lo::pipe_to_lo (octave_pq_connection &a_oct_pq_conn,
+                        const char *cmd, bool acommit, std::string &amsg) :
+  oct_pq_conn (a_oct_pq_conn), conn (a_oct_pq_conn.octave_pq_get_conn ()),
+  oid (0), fp (NULL), oid_valid (false), lod (-1), commit (acommit), msg (amsg)
+{
+  BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  oid = lo_creat (conn, INV_WRITE);
+  END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  if (! oid || oid == InvalidOid)
+    {
+      msg = PQerrorMessage (conn);
+
+      oid = 0;
+
+      return;
+    }
+
+  if (! (fp = popen (cmd, "r")))
+    {
+      msg = "could not create pipe";
+
+      return;
+    }
+
+  BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  lod = lo_open (conn, oid, INV_WRITE);
+  END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  if (lod == -1)
+    {
+      msg = PQerrorMessage (conn);
+
+      return;
+    }
+
+  char buff [OCT_PQ_BUFSIZE];
+
+  int nb, pnb;
+
+  while (true)
+    {
+      BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      nb = fread (buff, 1, OCT_PQ_BUFSIZE, fp);
+      END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+
+      if (! nb) break;
+
+      BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      pnb = lo_write (conn, lod, buff, nb);
+      END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      if (pnb != nb)
+        {
+          msg = PQerrorMessage (conn);
+
+          break;
+        }
+      }
+  if (nb) return;
+
+  if (pclose (fp) == -1)
+    error ("error closing pipe");
+
+  fp = NULL;
+
+  if (lo_close (conn, lod))
+    msg = PQerrorMessage (conn);
+  else
+    oid_valid = true;
+
+  lod = -1;
+}
+
+pipe_to_lo::~pipe_to_lo (void)
+{
+  if (lod != -1)
+    {
+      if (lo_close (conn, lod))
+        error ("%s", PQerrorMessage (conn));
+
+      lod = -1;
+    }
+
+  if (oid && ! oid_valid)
+    {
+      if (lo_unlink (conn, oid) == -1)
+        error ("error unlinking new large object with oid %i", oid);
+    }
+  else
+    oid = 0;
+
+  if (fp)
+    {
+      if (pclose (fp) == -1)
+        error ("error closing pipe");
+
+      fp = NULL;
+    }
+
+  if (commit)
+    {
+      std::string cmd ("commit;");
+      Cell params;
+      Cell ptypes (1, 0);
+      Cell rtypes;
+      std::string caller ("pq_lo_import");
+      command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller);
+
+      if (c.good ())
+        c.process_single_result ();
+
+      if (! c.good ())
+        error ("pq_lo_import: could not commit");
+    }
+}
+
+// For cleanup handling this is a class.
+class lo_to_pipe
+{
+public:
+
+  lo_to_pipe (octave_pq_connection &, Oid, const char *, bool, std::string &);
+
+  ~lo_to_pipe (void);
+
+  bool valid (void) { return success; }
+
+  std::string &msg;
+
+private:
+
+  Oid oid;
+
+  FILE *fp;
+
+  bool success;
+
+  int lod;
+
+  bool commit;
+
+  octave_pq_connection &oct_pq_conn;
+
+  PGconn *conn;
+};
+
+lo_to_pipe::lo_to_pipe (octave_pq_connection &a_oct_pq_conn, Oid aoid,
+                        const char *cmd, bool acommit, std::string &amsg) :
+  oct_pq_conn (a_oct_pq_conn), conn (a_oct_pq_conn.octave_pq_get_conn ()),
+  fp (NULL), success (false), lod (-1), commit (acommit), msg (amsg), oid (aoid)
+{
+  if (! (fp = popen (cmd, "w")))
+    {
+      msg = "could not create pipe";
+
+      return;
+    }
+
+  BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  lod = lo_open (conn, oid, INV_READ);
+  END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+  if (lod == -1)
+    {
+      msg = PQerrorMessage (conn);
+
+      return;
+    }
+
+  char buff [OCT_PQ_BUFSIZE];
+
+  int nb, pnb;
+
+  while (true)
+    {
+      BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      pnb = lo_read (conn, lod, buff, OCT_PQ_BUFSIZE);
+      END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+
+      if (pnb == -1)
+        {
+          msg = PQerrorMessage (conn);
+
+          break;
+        }
+
+      if (! pnb) break;
+
+      BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      nb = fwrite (buff, 1, pnb, fp);
+      END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE;
+      if (nb != pnb)
+        {
+          msg = "error writing to pipe";
+
+          break;
+        }
+    }
+  if (pnb) return;
+
+  if (pclose (fp) == -1)
+    error ("error closing pipe");
+
+  fp = NULL;
+
+  if (lo_close (conn, lod))
+    msg = PQerrorMessage (conn);
+  else
+    success = true;
+
+  lod = -1;
+}
+
+lo_to_pipe::~lo_to_pipe (void)
+{
+  if (lod != -1)
+    {
+      if (lo_close (conn, lod))
+        error ("%s", PQerrorMessage (conn));
+
+      lod = -1;
+    }
+
+  if (fp)
+    {
+      if (pclose (fp) == -1)
+        error ("error closing pipe");
+
+      fp = NULL;
+    }
+
+  if (commit)
+    {
+      std::string cmd ("commit;");
+      Cell params;
+      Cell ptypes (1, 0);
+      Cell rtypes;
+      std::string caller ("pq_lo_export");
+      command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller);
+
+      if (c.good ())
+        c.process_single_result ();
+
+      if (! c.good ())
+        error ("pq_lo_export: could not commit");
+    }
+}
 
 DEFUN_DLD (pq_lo_import, args, ,
            "-*- texinfo -*-\n\
 @deftypefn {Loadable Function} {@var{oid}} pq_lo_import (@var{connection}, @var{path})\n\
-Imports the file in @var{path} on the client side as a large object into the database associated with @var{connection} and returns the Oid of the new large object.\n\
+Imports the file in @var{path} on the client side as a large object into the database associated with @var{connection} and returns the Oid of the new large object. If @var{path} ends with a @code{|}, it is take as a shell command whose output is piped into a large object.\n\
 @end deftypefn")
 {
   std::string fname ("pq_lo_import");
@@ -57,6 +337,27 @@
       return retval;
     }
 
+  bool from_pipe = false;
+  unsigned int l = path.size ();
+  if (l && path[l - 1] == '|')
+    {
+      unsigned int pos;
+      // There seemed to be a bug in my C++ library so that
+      // path.find_last_not_of (" \t\n\r\f", l - 1))
+      // returned l - 1 ! This is the workaround.
+      path.erase (l - 1, 1);
+      if ((pos = path.find_last_not_of (" \t\n\r\f"))
+          == std::string::npos)
+        {
+          error ("%s: no command found to pipe from", fname.c_str ());
+
+          return retval;
+        }
+      path.erase (pos + 1, std::string::npos);
+
+      from_pipe = true;
+    }
+
   octave_base_value& rep = const_cast<octave_base_value&> (args(0).get_rep ());
 
   octave_pq_connection &oct_pq_conn = dynamic_cast<octave_pq_connection&> (rep);
@@ -109,16 +410,28 @@
         }
     }
 
-  Oid oid;
+  Oid oid = 0;
 
   bool import_error = false;
   std::string msg;
 
-  if (! (oid = lo_import (conn, path.c_str ())))
+  if (from_pipe)
     {
-      import_error = true;
-      msg = PQerrorMessage (conn);
+      pipe_to_lo tp (oct_pq_conn, path.c_str (), make_tblock, msg);
+
+      make_tblock = false; // commit handled by destructor of pipe_to_lo
+
+      if (tp.valid ())
+        oid = tp.get_oid ();
+      else
+        import_error = true;
     }
+  else
+    if (! (oid = lo_import (conn, path.c_str ())))
+      {
+        import_error = true;
+        msg = PQerrorMessage (conn);
+      }
 
   // if we started the transaction, commit it even in case of import failure
   bool commit_error = false;
@@ -155,7 +468,7 @@
 DEFUN_DLD (pq_lo_export, args, ,
            "-*- texinfo -*-\n\
 @deftypefn {Loadable Function} pq_lo_export (@var{connection}, @var{oid}, @var{path})\n\
-Exports the large object of Oid @var{oid} in the database associated with @var{connection} to the file @var{path} on the client side.\n\
+Exports the large object of Oid @var{oid} in the database associated with @var{connection} to the file @var{path} on the client side. If @var{path} starts with a @code{|}, it is taken as a shell commant to pipe to.\n\
 @end deftypefn")
 {
   std::string fname ("pq_lo_export");
@@ -180,6 +493,22 @@
       return retval;
     }
 
+  bool to_pipe = false;
+  if (! path.empty () && path[0] == '|')
+    {
+      unsigned int pos;
+      if ((pos = path.find_first_not_of (" \t\n\r\f", 1))
+          == std::string::npos)
+        {
+          error ("%s: no command found to pipe to", fname.c_str ());
+
+          return retval;
+        }
+      path.erase (0, pos);
+
+      to_pipe = true;
+    }
+
   Oid oid = args(1).uint_value ();
 
   if (error_state)
@@ -245,11 +574,21 @@
   bool export_error = false;
   std::string msg;
 
-  if (lo_export (conn, oid, path.c_str ()) == -1)
+  if (to_pipe)
     {
-      export_error = true;
-      msg = PQerrorMessage (conn);
+      lo_to_pipe tp (oct_pq_conn, oid, path.c_str (), make_tblock, msg);
+
+      make_tblock = false; // commit handled by destructor of lo_to_pipe
+
+      if (! tp.valid ())
+        export_error = true;
     }
+  else
+    if (lo_export (conn, oid, path.c_str ()) == -1)
+      {
+        export_error = true;
+        msg = PQerrorMessage (conn);
+      }
 
   // if we started the transaction, commit it even in case of export failure
   bool commit_error = false;