Mercurial > forge
view main/database/src/command.cc @ 12720:52ca082757c2 octave-forge tip
Update copyright notices.
author | i7tiol |
---|---|
date | Sat, 27 Feb 2016 11:21:29 +0000 |
parents | 1af86934c14e |
children |
line wrap: on
line source
/* Copyright (C) 2012-2016 Olaf Till <i7tiol@t-online.de> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; If not, see <http://www.gnu.org/licenses/>. */ #include <octave/oct.h> #include <octave/ov-struct.h> #include <octave/Cell.h> #include <octave/lo-ieee.h> #include <iostream> #include <fstream> #include "command.h" #include "converters.h" #include "error-helpers.h" #define COPY_HEADER_SIZE 19 #define COUT_RESIZE_STEP 1000 // resize result only after this number of rows command::command (octave_pq_connection_rep &connection, std::string &cmd, Cell &rtypes,std::string &who) : res (NULL), all_fetched (0), valid (1), conn (connection), rettypes (rtypes), caller (who) { if (! (cptr = conn.octave_pq_get_conn ())) { valid = 0; c_verror ("%s: connection not open", caller.c_str ()); } if (! PQsendQuery (cptr, cmd.c_str ())) { valid = 0; c_verror ("%s: could not dispatch command: %s", caller.c_str (), PQerrorMessage (cptr)); } else { res = PQgetResult (cptr); check_first_result (); } } command::command (octave_pq_connection_rep &connection, std::string &cmd, Cell ¶ms, Cell &ptypes, Cell &rtypes, std::string &who) : res (NULL), all_fetched (1), valid (1), conn (connection), rettypes (rtypes), caller (who) { if (! (cptr = conn.octave_pq_get_conn ())) { valid = 0; c_verror ("%s: connection not open", caller.c_str ()); } int npars = params.numel (); char *vals [npars]; std::vector<oct_pq_dynvec_t> valsvec; valsvec.resize (npars); int pl [npars]; int pf [npars]; Oid oids [npars]; for (int i = 0; i < npars; i++) { pf[i] = 1; // means binary format if (params(i).is_real_scalar () && params(i).isna ().bool_value ()) { vals[i] = NULL; oids[i] = 0; } else { pq_oct_type_t oct_type; oct_pq_conv_t *conv; if (ptypes(i).is_empty ()) { oct_type = simple; if (! (conv = pgtype_from_octtype (conn, params(i)))) { valid = 0; break; } // array not possible here oids[i] = conv->oid; } else { bool err; std::string s; SET_ERR (s = ptypes(i).string_value (), err); if (err) { valid = 0; c_verror ("%s: parameter type specification no string", caller.c_str ()); break; } if (! (conv = pgtype_from_spec (conn, s, oct_type))) { valid = 0; break; } if (oct_type == array) oids[i] = conv->aoid; else oids[i] = conv->oid; } switch (oct_type) { case simple: if (conv->from_octave_bin (conn, params(i), valsvec[i])) valid = 0; break; case array: if (from_octave_bin_array (conn, params(i), valsvec[i], conv)) valid = 0; break; case composite: if (from_octave_bin_composite (conn, params(i), valsvec[i], conv)) valid = 0; break; default: // should not get here valid = 0; c_verror ("%s: internal error, undefined type identifier", caller.c_str ()); } if (! valid) break; vals[i] = &(valsvec[i].front ()); pl[i] = valsvec[i].size (); } } if (valid) { BEGIN_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; res = PQexecParams (cptr, cmd.c_str (), npars, oids, vals, pl, pf, 1); END_INTERRUPT_IMMEDIATELY_IN_FOREIGN_CODE; check_first_result (); } } octave_map command::get_elements_typeinfo (oct_pq_conv_t *conv, bool &err) { int nel = conv->el_oids.size (); octave_map ret (dim_vector (1, nel)); Cell types_name (1, nel); Cell types_array (1, nel); Cell types_composite (1, nel); Cell types_enum (1, nel); Cell types_elements (1, nel); for (int i = 0; i < nel; i++) { oct_pq_conv_t *el_conv; pq_oct_type_t oct_type; if (! (el_conv = pgtype_from_spec (conn, conv->el_oids[i], conv->conv_cache[i], oct_type))) { err = true; return ret; } types_name(i) = octave_value (el_conv->name); types_array(i) = octave_value (oct_type == array); types_enum(i) = octave_value (el_conv->is_enum); types_composite(i) = octave_value (el_conv->is_composite); if (el_conv->is_composite) { bool rec_err = false; types_elements(i) = octave_value (get_elements_typeinfo (el_conv, rec_err)); if (rec_err) { err = true; return ret; } } } ret.assign ("name", types_name); ret.assign ("is_array", types_array); ret.assign ("is_composite", types_composite); ret.assign ("is_enum", types_enum); ret.assign ("elements", types_elements); return ret; } octave_value command::process_single_result (const std::string &infile, const std::string &outfile, const Cell &cdata, const Cell &ctypes, bool coids, bool cin_var) { octave_value retval; // first result is already fetched if (! res && (res = PQgetResult (cptr))) state = PQresultStatus (res); if (! res) all_fetched = 1; else { switch (state) { case PGRES_BAD_RESPONSE: valid = 0; c_verror ("%s: server response not understood", caller.c_str ()); break; case PGRES_FATAL_ERROR: valid = 0; c_verror ("%s: fatal error: %s", caller.c_str (), PQresultErrorMessage (res)); break; case PGRES_COMMAND_OK: retval = command_ok_handler (); break; case PGRES_TUPLES_OK: retval = tuples_ok_handler (); break; case PGRES_COPY_OUT: retval = copy_out_handler (outfile); break; case PGRES_COPY_IN: retval = copy_in_handler (infile, cdata, ctypes, coids, cin_var); break; case PGRES_NONFATAL_ERROR: break; default: valid = 0; c_verror ("internal error, unexpected server response"); } if (res) // could have been changed by a handler { PQclear (res); res = NULL; } } return retval; } octave_value command::tuples_ok_handler (void) { octave_map ret; int nt = PQntuples (res); int nf = PQnfields (res); Cell data (nt, nf); Cell columns (1, nf); Cell types_name (1, nf); Cell types_array (1, nf); Cell types_composite (1, nf); Cell types_enum (1, nf); Cell types_elements (1, nf); octave_map types (dim_vector (1, nf)); bool rtypes_given; int l = rettypes.numel (); if (l > 0) { if (l != nf) { valid = 0; c_verror ("%s: wrong number of given returned types", caller.c_str ()); return octave_value_list (); } rtypes_given = true; } else rtypes_given = false; for (int j = 0; j < nf; j++) // j is column { columns(j) = octave_value (PQfname (res, j)); int f = PQfformat (res, j); oct_pq_to_octave_fp_t simple_type_to_octave; to_octave_array_fp_t array_to_octave; to_octave_composite_fp_t composite_to_octave; oct_pq_conv_t *conv = NULL; // silence inadequate warning by // initializing it here pq_oct_type_t oct_type; if (rtypes_given) // for internal reading of system tables { std::string type; bool err; SET_ERR (type = rettypes(j).string_value (), err); if (err) { valid = 0; c_verror ("%s: could not convert given type to string", caller.c_str ()); break; } else if (! (conv = pgtype_from_spec (conn, type, oct_type))) { valid = 0; break; } } else if (! (conv = pgtype_from_spec (conn, PQftype (res, j), oct_type))) { valid = 0; break; } if (f) { array_to_octave = &to_octave_bin_array; composite_to_octave = &to_octave_bin_composite; // will be NULL for non-simple converters simple_type_to_octave = conv->to_octave_bin; } else { array_to_octave = &to_octave_str_array; composite_to_octave = &to_octave_str_composite; // will be NULL for non-simple converters simple_type_to_octave = conv->to_octave_str; } // prepare type information types_name(j) = octave_value (conv->name); types_array(j) = octave_value (oct_type == array); types_enum(j) = octave_value (conv->is_enum); types_composite(j) = octave_value (conv->is_composite); if (conv->is_composite) { // To implement here: recursively go through the elements // and return respective recursive structures. This has the // side effect that all converters necessary for this query // will be looked up and cached (if they aren't already), so // in the actual conversion of composite types only cache // reads are performed, no map lookups. bool err = false; types_elements(j) = octave_value (get_elements_typeinfo (conv, err)); if (err) { valid = 0; break; } } for (int i = 0; i < nt; i++) // i is row { if (PQgetisnull (res, i, j)) data(i, j) = octave_value (octave_NA); else { char *v = PQgetvalue (res, i, j); int nb = PQgetlength (res, i, j); octave_value ov; switch (oct_type) { case simple: if (simple_type_to_octave (conn, v, ov, nb)) valid = 0; break; case array: if (array_to_octave (conn, v, ov, nb, conv)) valid = 0; break; case composite: if (composite_to_octave (conn, v, ov, nb, conv)) valid = 0; break; default: // should not get here c_verror ("%s: internal error, undefined type identifier", caller.c_str ()); valid = 0; } if (valid) data(i, j) = ov; else break; } } if (! valid) break; } if (! valid) return octave_value_list (); else { ret.assign ("data", octave_value (data)); ret.assign ("columns", octave_value (columns)); types.setfield ("name", types_name); types.setfield ("is_array", types_array); types.setfield ("is_composite", types_composite); types.setfield ("is_enum", types_enum); types.setfield ("elements", types_elements); ret.assign ("types", octave_value (types)); return octave_value (ret); } } octave_value command::copy_out_handler (const std::string &outfile) { octave_value retval; if (! outfile.empty ()) { // store unchecked output in file std::ofstream ostr (outfile.c_str (), std::ios_base::out); if (ostr.fail ()) { valid = 0; c_verror ("could not open output file %s", outfile.c_str ()); return retval; } char *data; int nb; while ((nb = PQgetCopyData (cptr, &data, 0)) > 0) { if (! (ostr.fail () || ostr.bad ())) { ostr.write (data, nb); if (ostr.bad ()) c_verror ("write to file failed"); } PQfreemem (data); } if (! ostr.bad ()) ostr.close (); if (nb == -2) { valid = 0; c_verror ("server error in copy-out: %s", PQerrorMessage (cptr)); } else { PQclear (res); if ((res = PQgetResult (cptr))) { if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR) { valid = 0; c_verror ("server error in copy-out: %s", PQerrorMessage (cptr)); } } else { valid = 0; c_verror ("unexpectedly got no result information"); } } } else { valid = 0; c_verror ("no output file given"); } return octave_value (std::string ("copy out")); } octave_value command::copy_in_handler (const std::string &infile, const Cell &data, const Cell &cin_types, bool oids, bool var) { octave_value retval; #define OCT_PQ_READSIZE 4096 char buff [OCT_PQ_READSIZE]; if (! var) { // read unchecked input from file if (infile.empty ()) { valid = 0; c_verror ("no input file given"); return retval; } std::ifstream istr (infile.c_str (), std::ios_base::in); if (istr.fail ()) { c_verror ("could not open input file %s", infile.c_str ()); PQputCopyEnd (cptr, "could not open input file"); c_verror ("server error: %s", PQerrorMessage (cptr)); valid = 0; return retval; } do { istr.read (buff, OCT_PQ_READSIZE); if (istr.bad ()) { valid = 0; c_verror ("could not read file %s", infile.c_str ()); break; } else { int nb; if ((nb = istr.gcount ()) > 0) if (PQputCopyData (cptr, buff, nb) == -1) { valid = 0; c_verror ("%s", PQerrorMessage (cptr)); break; } } } while (! istr.eof ()); istr.close (); if (! valid) { PQputCopyEnd (cptr, "copy-in interrupted"); c_verror ("%s", PQerrorMessage (cptr)); } else { if (PQputCopyEnd (cptr, NULL) == -1) { valid = 0; c_verror ("%s", PQerrorMessage (cptr)); } else { PQclear (res); if ((res = PQgetResult (cptr))) { if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR) { valid = 0; c_verror ("server error in copy-in: %s", PQerrorMessage (cptr)); } } else { valid = 0; c_verror ("unexpectedly got no result information"); } } } } else { // copy in from octave variable dim_vector dv = data.dims (); octave_idx_type r = dv(0); octave_idx_type c = dv(1); octave_idx_type nf = PQnfields (res); if (c != nf + oids) { valid = 0; c_verror ("variable for copy-in has %i columns, but should have %i", c, nf + oids); PQputCopyEnd (cptr, "variable for copy-in has wrong number of columns"); } else if (! PQbinaryTuples (res)) { valid = 0; c_verror ("copy-in from variable must use binary mode"); PQputCopyEnd (cptr, "copy-in from variable must use binary mode"); } else { for (octave_idx_type j = 0; j < nf; j++) if (! PQfformat (res, j)) { valid = 0; c_verror ("copy-in from variable must use binary mode in all columns"); PQputCopyEnd (cptr, "copy-in from variable must use binary mode in all columns"); break; } } if (! valid) { c_verror ("server error: %s", PQerrorMessage (cptr)); return retval; } char header [COPY_HEADER_SIZE]; memset (header, 0, COPY_HEADER_SIZE); strcpy (header, "PGCOPY\n\377\r\n\0"); uint32_t tpu32 = htobe32 (uint32_t (oids) << 16); memcpy (&header[11], &tpu32, 4); char trailer [2]; int16_t tp16 = htobe16 (int16_t (-1)); memcpy (&trailer, &tp16, 2); if (PQputCopyData (cptr, header, COPY_HEADER_SIZE) == -1) { PQputCopyEnd (cptr, "could not send header"); valid = 0; c_verror ("server error: %s", PQerrorMessage (cptr)); } else { oct_pq_conv_t *convs [c]; memset (convs, 0, sizeof (convs)); pq_oct_type_t oct_types [c]; for (octave_idx_type i = 0; i < r; i++) // i is row { int16_t fc = htobe16 (int16_t (nf)); if (PQputCopyData (cptr, (char *) &fc, 2) == -1) { c_verror ("%s", PQerrorMessage (cptr)); PQputCopyEnd (cptr, "error sending field count"); c_verror ("server error: %s", PQerrorMessage (cptr)); valid = 0; break; } // j is column of argument data for (octave_idx_type j = 0; j < c; j++) { if (data(i, j).is_real_scalar () && data(i, j).isna ().bool_value ()) { int32_t t = htobe32 (int32_t (-1)); if (PQputCopyData (cptr, (char *) &t, 4) == -1) { valid = 0; c_verror ("could not send NULL in copy-in"); break; } } else { if (! convs [j]) { if ((j == 0) && oids) { std::string t ("oid"); if (! (convs[0] = pgtype_from_spec (conn, t, oct_types[0]))) { valid = 0; c_verror ("could not get converter for oid in copy-in"); break; } } else { if (cin_types(j).is_empty ()) { oct_types[j] = simple; if (! (convs[j] = pgtype_from_octtype (conn, data(i, j)))) { valid = 0; c_verror ("could not determine type in column %i for copy-in", j); break; } } else { bool err; std::string s; SET_ERR (s = cin_types(j).string_value (), err); if (err) { valid = 0; c_verror ("column type specification no string"); break; } if (! (convs[j] = pgtype_from_spec (conn, s, oct_types[j]))) { valid = 0; c_verror ("invalid column type specification"); break; } } } } // ! convs [j] oct_pq_dynvec_t val; bool conversion_failed = false; switch (oct_types[j]) { case simple: if (convs[j]->from_octave_bin (conn, data(i, j), val)) conversion_failed = true; break; case array: if (from_octave_bin_array (conn, data(i, j), val, convs[j])) conversion_failed = true; break; case composite: if (from_octave_bin_composite (conn, data(i, j), val, convs[j])) conversion_failed = true; break; default: // should not get here c_verror ("internal error, undefined type identifier"); conversion_failed = true; } if (conversion_failed) { valid = 0; error ("could not convert data(%i, %i) for copy-in", i, j); } else { uint32_t t = htobe32 (uint32_t (val.size ())); if (PQputCopyData (cptr, (char *) &t, 4) == -1) { valid = 0; c_verror ("could not send data length in copy-in"); } else if (PQputCopyData (cptr, &(val.front ()), val.size ()) == -1) { valid = 0; c_verror ("could not send copy-in data"); } } if (! valid) break; } } // columns of argument data if (! valid) { PQputCopyEnd (cptr, "error sending copy-in data"); c_verror ("server error: %s", PQerrorMessage (cptr)); break; } } // rows of argument data } if (valid) if (PQputCopyData (cptr, trailer, 2) == -1) { valid = 0; PQputCopyEnd (cptr, "could not send trailer"); c_verror ("%s", PQerrorMessage (cptr)); } if (valid) { if (PQputCopyEnd (cptr, NULL) == -1) { valid = 0; c_verror ("%s", PQerrorMessage (cptr)); } else { PQclear (res); if ((res = PQgetResult (cptr))) { if ((state = PQresultStatus (res)) == PGRES_FATAL_ERROR) { valid = 0; c_verror ("server error in copy-in: %s", PQerrorMessage (cptr)); } } else { valid = 0; c_verror ("unexpectedly got no result information"); } } } } // copy from variable return octave_value (std::string ("copy in")); }