Go to the documentation of this file.00001 #include "Database.h"
00002
00003 #include <poll.h>
00004 #include <libpqtypes.h>
00005 #include "psql.h"
00006 #include "PgSqlException.h"
00007
00008 namespace psql
00009 {
00010
00011 void Database::begin_transaction()
00012 {
00013 execute_sql(*this, "BEGIN TRANSACTION");
00014 }
00015
00016 void Database::commit_transaction()
00017 {
00018 savepoints.clear();
00019 execute_sql(*this, "COMMIT TRANSACTION");
00020 cursors.clear();
00021 }
00022
00023 void Database::rollback_transaction()
00024 {
00025 savepoints.clear();
00026 execute_sql(*this, "ROLLBACK TRANSACTION");
00027 cursors.clear();
00028 }
00029
00030 void Database::savepoint(const std::string& name)
00031 {
00032 if (savepoints.count(name))
00033 execute_sql(*this, "RELEASE SAVEPOINT " + name);
00034
00035 execute_sql(*this, "SAVEPOINT " + name);
00036 savepoints.insert(name);
00037 }
00038 void Database::rollback_to_savepoint(const std::string& name)
00039 {
00040 execute_sql(*this, "ROLLBACK TO SAVEPOINT " + name);
00041 }
00042
00043 void Database::analyze()
00044 {
00045 execute_sql(*this, "ANALYZE");
00046 }
00047
00048 void Database::set_schema(std::string schema)
00049 {
00050 if (schema != "")
00051 schema += ",";
00052 execute_sql(*this, "SET search_path TO " + schema + "public");
00053 }
00054
00055 void Database::create_schema(const std::string& schema)
00056 {
00057 execute_sql(*this, "CREATE SCHEMA " + schema);
00058 }
00059
00060 bool Database::in_transaction()
00061 {
00062 auto c = get_db();
00063 auto res = PQtransactionStatus(c);
00064 return res == PQTRANS_INTRANS;
00065 }
00066
00067 bool Database::in_failed_transaction()
00068 {
00069 auto c = get_db();
00070 auto res = PQtransactionStatus(c);
00071 return res == PQTRANS_INERROR;
00072 }
00073
00074 void Database::add_cursor(ICursor* curs)
00075 {
00076 cursors.insert(curs);
00077 }
00078
00079 void Database::remove_cursor(ICursor* curs)
00080 {
00081 cursors.erase(curs);
00082 }
00083
00084 bool Database::is_cursor(ICursor* curs) const
00085 {
00086 return cursors.find(curs) != cursors.end();
00087 }
00088
00089 void noticeReceiver(void* arg, const PGresult* res)
00090 {
00091 ((Database*)arg)->receiveNotice(res);
00092 }
00093
00094 Database::Database(const std::string& conninfo, bool synchr)
00095 : conn(NULL), conn_synchr(synchr)
00096 {
00097 if (synchr)
00098 {
00099 conn = PQconnectdb(conninfo.c_str());
00100 }
00101 else
00102 {
00103 conn = PQconnectStart(conninfo.c_str());
00104 }
00105 if (conn == NULL)
00106 throw PgSqlException("Unable to allocate memory for connection object");
00107 async = !synchr;
00108 if (PQstatus(conn) == CONNECTION_BAD)
00109 throw PgSqlException("Unable to connect to postgresql server: " + std::string(PQerrorMessage(conn)));
00110
00111 if (!async && PQinitTypes(conn) == 0)
00112 throw PgSqlException("Error initializing libpqtypes: " + std::string(PQgeterror()));
00113
00114 if (!async)
00115 {
00116 PQsetNoticeReceiver(conn, noticeReceiver, (void*)((((((this)))))));
00117 }
00118 }
00119
00120 PGconn* Database::get_db()
00121 {
00122 if (async)
00123 {
00124 int sock = PQsocket(conn);
00125 struct pollfd fd;
00126 fd.fd = sock;
00127 fd.events = POLLIN | POLLOUT;
00128 int status = PGRES_POLLING_WRITING;
00129 while (status != PGRES_POLLING_FAILED && status != PGRES_POLLING_OK)
00130 {
00131 poll(&fd, 1, -1);
00132 status = PQconnectPoll(conn);
00133 }
00134 if (PQstatus(conn) == CONNECTION_BAD)
00135 throw PgSqlException("Unable to connect to postgresql server: " + std::string(PQerrorMessage(conn)));
00136 async = false;
00137 if (PQinitTypes(conn) == 0)
00138 throw PgSqlException("Error initializing libpqtypes: " + std::string(PQgeterror()));
00139 PQsetNoticeReceiver(conn, noticeReceiver, (void*)this);
00140 }
00141 else
00142 {
00143 if (PQstatus(conn) == CONNECTION_BAD)
00144 {
00145 PQreset(conn);
00146 if (PQstatus(conn) == CONNECTION_BAD)
00147 throw PgSqlException("Connection to server lost: " + std::string(PQerrorMessage(conn)));
00148 }
00149 }
00150 unsigned int dealloc_target = 0;
00151 while (to_dealloc.size() > dealloc_target)
00152 {
00153 std::string name(to_dealloc[dealloc_target]);
00154 auto res = PQexec(conn, ("DEALLOCATE " + name).c_str());
00155 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
00156 {
00157 dealloc_target++;
00158 }
00159 else
00160 {
00161 to_dealloc.erase(to_dealloc.begin() + dealloc_target);
00162 }
00163 if (res != NULL)
00164 PQclear(res);
00165 }
00166 return conn;
00167 }
00168
00169 Database::~Database()
00170 {
00171 if (conn != NULL)
00172 PQfinish(conn);
00173 }
00174
00175 void Database::regist(const std::string& name, const std::string& sql, IStatement* st)
00176 {
00177 auto res = PQprepare(get_db(), name.c_str(), sql.c_str(), 0, NULL);
00178 if (res == NULL)
00179 throw PgSqlException("Error when preparing statement " + std::string(PQerrorMessage(conn)));
00180 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00181 {
00182 std::string str(PQresultErrorMessage(res));
00183 PQclear(res);
00184 throw PgSqlException("Error when preparing statement " + str);
00185 }
00186 PQclear(res);
00187 stmts[name] = st;
00188 }
00189
00190 void Database::unregist(const std::string& name, IStatement* st)
00191 {
00192 if (!stmts.count(name))
00193 throw PgSqlException("Unregistering not registered statement");
00194
00195 if (stmts[name] != st)
00196 return;
00197
00198 auto res = PQexec(conn, ("DEALLOCATE " + name).c_str());
00199 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
00200 {
00201 if (res != NULL)
00202 PQclear(res);
00203 to_dealloc.push_back(name);
00204 }
00205 }
00206 boost::signal<void(const PGresult& )> & Database::notice_signal()
00207 {
00208 return notice_sig;
00209 }
00210
00211 void Database::receiveNotice(const PGresult* res)
00212 {
00213 notice_sig(*res);
00214 }
00215
00216 }