Mercurial > forge
diff main/database/src/pq_lo.cc @ 11422:b1f26b72b61a octave-forge
Large object import/export also from/to pipes.
author | i7tiol |
---|---|
date | Tue, 29 Jan 2013 18:20:21 +0000 |
parents | 9aee227e296c |
children | 35e9e4b6ab34 |
line wrap: on
line diff
--- a/main/database/src/pq_lo.cc Tue Jan 29 10:11:46 2013 +0000 +++ b/main/database/src/pq_lo.cc Tue Jan 29 18:20:21 2013 +0000 @@ -21,6 +21,8 @@ #include <octave/ov-struct.h> #include <octave/Cell.h> +#include <stdio.h> + #include "command.h" #include <postgresql/libpq/libpq-fs.h> @@ -28,11 +30,289 @@ // PKG_ADD: autoload ("pq_lo_export", "pq_interface.oct"); // PKG_ADD: autoload ("pq_lo_unlink", "pq_interface.oct"); +#define OCT_PQ_BUFSIZE 1024 + +// For cleanup handling this is a class. +class pipe_to_lo +{ +public: + + pipe_to_lo (octave_pq_connection &, const char *, bool, std::string &); + + ~pipe_to_lo (void); + + bool valid (void) { return oid_valid; } + + Oid get_oid (void) { return oid; } + + std::string &msg; + +private: + + Oid oid; + + FILE *fp; + + bool oid_valid; + + int lod; + + bool commit; + + octave_pq_connection &oct_pq_conn; + + PGconn *conn; +}; + +pipe_to_lo::pipe_to_lo (octave_pq_connection &a_oct_pq_conn, + const char *cmd, bool acommit, std::string &amsg) : + oct_pq_conn (a_oct_pq_conn), conn (a_oct_pq_conn.octave_pq_get_conn ()), + oid (0), fp (NULL), oid_valid (false), lod (-1), commit (acommit), msg (amsg) +{ + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + oid = lo_creat (conn, INV_WRITE); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + if (! oid || oid == InvalidOid) + { + msg = PQerrorMessage (conn); + + oid = 0; + + return; + } + + if (! (fp = popen (cmd, "r"))) + { + msg = "could not create pipe"; + + return; + } + + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + lod = lo_open (conn, oid, INV_WRITE); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + if (lod == -1) + { + msg = PQerrorMessage (conn); + + return; + } + + char buff [OCT_PQ_BUFSIZE]; + + int nb, pnb; + + while (true) + { + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + nb = fread (buff, 1, OCT_PQ_BUFSIZE, fp); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + + if (! nb) break; + + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + pnb = lo_write (conn, lod, buff, nb); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + if (pnb != nb) + { + msg = PQerrorMessage (conn); + + break; + } + } + if (nb) return; + + if (pclose (fp) == -1) + error ("error closing pipe"); + + fp = NULL; + + if (lo_close (conn, lod)) + msg = PQerrorMessage (conn); + else + oid_valid = true; + + lod = -1; +} + +pipe_to_lo::~pipe_to_lo (void) +{ + if (lod != -1) + { + if (lo_close (conn, lod)) + error ("%s", PQerrorMessage (conn)); + + lod = -1; + } + + if (oid && ! oid_valid) + { + if (lo_unlink (conn, oid) == -1) + error ("error unlinking new large object with oid %i", oid); + } + else + oid = 0; + + if (fp) + { + if (pclose (fp) == -1) + error ("error closing pipe"); + + fp = NULL; + } + + if (commit) + { + std::string cmd ("commit;"); + Cell params; + Cell ptypes (1, 0); + Cell rtypes; + std::string caller ("pq_lo_import"); + command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller); + + if (c.good ()) + c.process_single_result (); + + if (! c.good ()) + error ("pq_lo_import: could not commit"); + } +} + +// For cleanup handling this is a class. +class lo_to_pipe +{ +public: + + lo_to_pipe (octave_pq_connection &, Oid, const char *, bool, std::string &); + + ~lo_to_pipe (void); + + bool valid (void) { return success; } + + std::string &msg; + +private: + + Oid oid; + + FILE *fp; + + bool success; + + int lod; + + bool commit; + + octave_pq_connection &oct_pq_conn; + + PGconn *conn; +}; + +lo_to_pipe::lo_to_pipe (octave_pq_connection &a_oct_pq_conn, Oid aoid, + const char *cmd, bool acommit, std::string &amsg) : + oct_pq_conn (a_oct_pq_conn), conn (a_oct_pq_conn.octave_pq_get_conn ()), + fp (NULL), success (false), lod (-1), commit (acommit), msg (amsg), oid (aoid) +{ + if (! (fp = popen (cmd, "w"))) + { + msg = "could not create pipe"; + + return; + } + + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + lod = lo_open (conn, oid, INV_READ); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + if (lod == -1) + { + msg = PQerrorMessage (conn); + + return; + } + + char buff [OCT_PQ_BUFSIZE]; + + int nb, pnb; + + while (true) + { + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + pnb = lo_read (conn, lod, buff, OCT_PQ_BUFSIZE); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + + if (pnb == -1) + { + msg = PQerrorMessage (conn); + + break; + } + + if (! pnb) break; + + BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + nb = fwrite (buff, 1, pnb, fp); + END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; + if (nb != pnb) + { + msg = "error writing to pipe"; + + break; + } + } + if (pnb) return; + + if (pclose (fp) == -1) + error ("error closing pipe"); + + fp = NULL; + + if (lo_close (conn, lod)) + msg = PQerrorMessage (conn); + else + success = true; + + lod = -1; +} + +lo_to_pipe::~lo_to_pipe (void) +{ + if (lod != -1) + { + if (lo_close (conn, lod)) + error ("%s", PQerrorMessage (conn)); + + lod = -1; + } + + if (fp) + { + if (pclose (fp) == -1) + error ("error closing pipe"); + + fp = NULL; + } + + if (commit) + { + std::string cmd ("commit;"); + Cell params; + Cell ptypes (1, 0); + Cell rtypes; + std::string caller ("pq_lo_export"); + command c (oct_pq_conn, cmd, params, ptypes, rtypes, caller); + + if (c.good ()) + c.process_single_result (); + + if (! c.good ()) + error ("pq_lo_export: could not commit"); + } +} DEFUN_DLD (pq_lo_import, args, , "-*- texinfo -*-\n\ @deftypefn {Loadable Function} {@var{oid}} pq_lo_import (@var{connection}, @var{path})\n\ -Imports the file in @var{path} on the client side as a large object into the database associated with @var{connection} and returns the Oid of the new large object.\n\ +Imports the file in @var{path} on the client side as a large object into the database associated with @var{connection} and returns the Oid of the new large object. If @var{path} ends with a @code{|}, it is take as a shell command whose output is piped into a large object.\n\ @end deftypefn") { std::string fname ("pq_lo_import"); @@ -57,6 +337,27 @@ return retval; } + bool from_pipe = false; + unsigned int l = path.size (); + if (l && path[l - 1] == '|') + { + unsigned int pos; + // There seemed to be a bug in my C++ library so that + // path.find_last_not_of (" \t\n\r\f", l - 1)) + // returned l - 1 ! This is the workaround. + path.erase (l - 1, 1); + if ((pos = path.find_last_not_of (" \t\n\r\f")) + == std::string::npos) + { + error ("%s: no command found to pipe from", fname.c_str ()); + + return retval; + } + path.erase (pos + 1, std::string::npos); + + from_pipe = true; + } + octave_base_value& rep = const_cast<octave_base_value&> (args(0).get_rep ()); octave_pq_connection &oct_pq_conn = dynamic_cast<octave_pq_connection&> (rep); @@ -109,16 +410,28 @@ } } - Oid oid; + Oid oid = 0; bool import_error = false; std::string msg; - if (! (oid = lo_import (conn, path.c_str ()))) + if (from_pipe) { - import_error = true; - msg = PQerrorMessage (conn); + pipe_to_lo tp (oct_pq_conn, path.c_str (), make_tblock, msg); + + make_tblock = false; // commit handled by destructor of pipe_to_lo + + if (tp.valid ()) + oid = tp.get_oid (); + else + import_error = true; } + else + if (! (oid = lo_import (conn, path.c_str ()))) + { + import_error = true; + msg = PQerrorMessage (conn); + } // if we started the transaction, commit it even in case of import failure bool commit_error = false; @@ -155,7 +468,7 @@ DEFUN_DLD (pq_lo_export, args, , "-*- texinfo -*-\n\ @deftypefn {Loadable Function} pq_lo_export (@var{connection}, @var{oid}, @var{path})\n\ -Exports the large object of Oid @var{oid} in the database associated with @var{connection} to the file @var{path} on the client side.\n\ +Exports the large object of Oid @var{oid} in the database associated with @var{connection} to the file @var{path} on the client side. If @var{path} starts with a @code{|}, it is taken as a shell commant to pipe to.\n\ @end deftypefn") { std::string fname ("pq_lo_export"); @@ -180,6 +493,22 @@ return retval; } + bool to_pipe = false; + if (! path.empty () && path[0] == '|') + { + unsigned int pos; + if ((pos = path.find_first_not_of (" \t\n\r\f", 1)) + == std::string::npos) + { + error ("%s: no command found to pipe to", fname.c_str ()); + + return retval; + } + path.erase (0, pos); + + to_pipe = true; + } + Oid oid = args(1).uint_value (); if (error_state) @@ -245,11 +574,21 @@ bool export_error = false; std::string msg; - if (lo_export (conn, oid, path.c_str ()) == -1) + if (to_pipe) { - export_error = true; - msg = PQerrorMessage (conn); + lo_to_pipe tp (oct_pq_conn, oid, path.c_str (), make_tblock, msg); + + make_tblock = false; // commit handled by destructor of lo_to_pipe + + if (! tp.valid ()) + export_error = true; } + else + if (lo_export (conn, oid, path.c_str ()) == -1) + { + export_error = true; + msg = PQerrorMessage (conn); + } // if we started the transaction, commit it even in case of export failure bool commit_error = false;