>
#include <list>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <queue>
#include <assert.h>
#include <functional>
#include <sstream>
#include <chrono>
#include "fdset.h"
#include "libpq-events.h"
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"
using namespace std;
class AsyncPGClient
{
public:
/*TODO::传递错误信息*/
typedef std::function<
void(
const PGresult*)>
RESULT_CALLBACK;
typedef std::function<
void(
bool value)>
BOOL_RESULT_CALLBACK;
typedef std::function<
void(
const string& value)>
STRING_RESULT_CALLBACK;
typedef std::function<
void(
const std::unordered_map<
string,
string>& value)>
STRINGMAP_RESULT_CALLBACK;
AsyncPGClient() : mKVTableName("kv_data"), mHashTableName(
"hashmap_data")
{
mfdset =
ox_fdset_new();
}
~
AsyncPGClient()
{
for (auto&
kv : mConnections)
{
PQfinish((*
kv.second).pgconn);
}
ox_fdset_delete(mfdset);
mfdset =
nullptr;
}
void get(
const string& key,
const STRING_RESULT_CALLBACK& callback =
nullptr)
{
mStringStream <<
"SELECT key, value FROM public." << mKVTableName <<
" where key = ‘" << key <<
"‘;";
postQuery(mStringStream.str(), [callback](const PGresult*
result){
if (callback != nullptr && result !=
nullptr)
{
if (PQntuples(result) ==
1 && PQnfields(result) ==
2)
{
callback(PQgetvalue(result, 0,
1));
}
}
});
}
void set(
const string& key,
const string& v,
const BOOL_RESULT_CALLBACK& callback =
nullptr)
{
mStringStream <<
"INSERT INTO public." << mKVTableName <<
"(key, value) VALUES(‘" << key <<
"‘, ‘" << v <<
"‘) ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value;";
postQuery(mStringStream.str(), [callback](const PGresult*
result){
if (callback !=
nullptr)
{
if (PQresultStatus(result) ==
PGRES_COMMAND_OK)
{
callback(true);
}
else
{
cout <<
PQresultErrorMessage(result);
callback(false);
}
}
});
}
void hget(
const string& hashname,
const string& key,
const STRING_RESULT_CALLBACK& callback =
nullptr)
{
hmget(hashname, { key }, [callback](const std::unordered_map<
string,
string>&
value){
if (callback != nullptr && !
value.empty())
{
callback((*
value.begin()).second);
}
});
}
void hmget(
const string& hashname,
const std::vector<
string>& keys,
const STRINGMAP_RESULT_CALLBACK& callback =
nullptr)
{
mStringStream <<
"SELECT key, value FROM public." << mHashTableName <<
" where ";
auto it =
keys.begin();
do
{
mStringStream <<
"key=‘" << (*it) <<
"‘";
++
it;
} while (it != keys.end() && &(mStringStream <<
" or ") !=
nullptr);
mStringStream <<
";";
postQuery(mStringStream.str(), [callback](const PGresult*
result){
if (callback !=
nullptr)
{
std::unordered_map<
string,
string>
ret;
if (PQresultStatus(result) ==
PGRES_TUPLES_OK)
{
int num =
PQntuples(result);
int fileds =
PQnfields(result);
if (fileds ==
2)
{
for (
int i =
0; i < num; i++
)
{
ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i,
1);
}
}
}
callback(ret);
}
});
}
void hset(
const string& hashname,
const string& key,
const string& value,
const BOOL_RESULT_CALLBACK& callback =
nullptr)
{
mStringStream <<
"INSERT INTO public." << mHashTableName <<
"(hashname, key, value) VALUES(‘" << hashname <<
"‘, ‘" << key <<
"‘, ‘" <<
value
<<
"‘) ON CONFLICT (hashname, key) DO UPDATE SET value = EXCLUDED.value;";
postQuery(mStringStream.str(), [callback](const PGresult*
result){
if (callback !=
nullptr)
{
callback(PQresultStatus(result) ==
PGRES_COMMAND_OK);
}
});
}
void hgetall(
const string& hashname,
const STRINGMAP_RESULT_CALLBACK& callback =
nullptr)
{
mStringStream <<
"SELECT key, value FROM public." << mHashTableName <<
" where hashname = ‘" << hashname <<
"‘;";
postQuery(mStringStream.str(), [callback](const PGresult*
result){
if (callback !=
nullptr)
{
std::unordered_map<
string,
string>
ret;
if (PQresultStatus(result) ==
PGRES_TUPLES_OK)
{
int num =
PQntuples(result);
int fileds =
PQnfields(result);
if (fileds ==
2)
{
for (
int i =
0; i < num; i++
)
{
ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i,
1);
}
}
}
callback(ret);
}
});
}
void postQuery(
const string&& query,
const RESULT_CALLBACK& callback =
nullptr)
{
mPendingQuery.push({ std::move(query), callback});
mStringStream.str(std::string());
mStringStream.clear();
}
void postQuery(
const string& query,
const RESULT_CALLBACK& callback =
nullptr)
{
mPendingQuery.push({ query, callback });
mStringStream.str(std::string());
mStringStream.clear();
}
public:
void poll(
int millSecond)
{
ox_fdset_poll(mfdset, millSecond);
std::vector<
int>
closeFds;
for (auto&
it : mConnections)
{
auto fd =
it.first;
auto connection =
it.second;
auto pgconn = connection->
pgconn;
if (ox_fdset_check(mfdset, fd, ReadCheck))
{
if (PQconsumeInput(pgconn) >
0 && PQisBusy(pgconn) ==
0)
{
bool successGetResult =
false;
while (
true)
{
auto result =
PQgetResult(pgconn);
if (result !=
nullptr)
{
successGetResult =
true;
if (connection->callback !=
nullptr)
{
connection->
callback(result);
connection->callback =
nullptr;
}
PQclear(result);
}
else
{
break;
}
}
if (successGetResult)
{
mIdleConnections.push_back(connection);
}
}
if (PQstatus(pgconn) ==
CONNECTION_BAD)
{
closeFds.push_back(fd);
}
}
if (ox_fdset_check(mfdset, fd, WriteCheck))
{
if (PQflush(pgconn) ==
0)
{
//移除可写检测
ox_fdset_del(mfdset, fd, WriteCheck);
}
}
}
for (auto&
v : closeFds)
{
removeConnection(v);
}
}
void trySendPendingQuery()
{
while (!mPendingQuery.empty() && !
mIdleConnections.empty())
{
auto& query =
mPendingQuery.front();
auto& connection =
mIdleConnections.front();
if (PQsendQuery(connection->pgconn, query.request.c_str()) ==
0)
{
cout << PQerrorMessage(connection->pgconn) <<
endl;
if (query.callback !=
nullptr)
{
query.callback(nullptr);
}
}
else
{
ox_fdset_add(mfdset, PQsocket(connection->
pgconn), WriteCheck);
connection->callback =
query.callback;
}
mPendingQuery.pop();
mIdleConnections.pop_front();
}
}
size_t pendingQueryNum() const
{
return mPendingQuery.size();
}
size_t getWorkingQuery() const
{
return mConnections.size() -
mIdleConnections.size();
}
void createConnection(
const char *pghost,
const char *
pgport,
const char *pgoptions,
const char *
pgtty,
const char *dbName,
const char *login,
const char *
pwd,
int num)
{
for (
int i =
0; i < num; i++
)
{
auto pgconn =
PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, login, pwd);
if (PQstatus(pgconn) ==
CONNECTION_OK)
{
auto connection = std::make_shared<Connection>
(pgconn, nullptr);
mConnections[PQsocket(pgconn)] =
connection;
PQsetnonblocking(pgconn, 1);
ox_fdset_add(mfdset, PQsocket(pgconn), ReadCheck);
mIdleConnections.push_back(connection);
}
else
{
cout <<
PQerrorMessage(pgconn);
PQfinish(pgconn);
pgconn =
nullptr;
}
}
if (!
mConnections.empty())
{
sCreateTable((*mConnections.begin()).second->
pgconn, mKVTableName, mHashTableName);
}
}
private:
void removeConnection(
int fd)
{
auto it =
mConnections.find(fd);
if (it !=
mConnections.end())
{
auto connection = (*
it).second;
for (auto it = mIdleConnections.begin(); it != mIdleConnections.end(); ++
it)
{
if ((*it)->pgconn == connection->
pgconn)
{
mIdleConnections.erase(it);
break;
}
}
ox_fdset_del(mfdset, fd, ReadCheck |
WriteCheck);
PQfinish(connection->
pgconn);
mConnections.erase(fd);
}
}
private:
static void sCreateTable(PGconn* conn,
const string& kvTableName,
const string&
hashTableName)
{
{
string query =
"CREATE TABLE public.";
query +=
kvTableName;
query +=
"(key character varying NOT NULL, value json, CONSTRAINT key PRIMARY KEY(key))";
PGresult* exeResult =
PQexec(conn, query.c_str());
auto status =
PQresultStatus(exeResult);
auto errorStr =
PQresultErrorMessage(exeResult);
PQclear(exeResult);
}
{
string query =
"CREATE TABLE public.";
query +=
hashTableName;
query +=
"(hashname character varying, key character varying, value json, "
"CONSTRAINT hk PRIMARY KEY (hashname, key))";
PGresult* exeResult =
PQexec(conn, query.c_str());
auto status =
PQresultStatus(exeResult);
auto errorStr =
PQresultErrorMessage(exeResult);
PQclear(exeResult);
}
}
private:
struct QueryAndCallback
{
std::string request;
RESULT_CALLBACK callback;
};
struct Connection
{
PGconn*
pgconn;
RESULT_CALLBACK callback;
Connection(PGconn*
p, RESULT_CALLBACK c)
{
pgconn =
p;
callback =
c;
}
};
const string mKVTableName;
const string mHashTableName;
stringstream mStringStream;
fdset_s*
mfdset;
std::unordered_map<
int, shared_ptr<Connection>>
mConnections;
std::list<shared_ptr<Connection>>
mIdleConnections;
std::queue<QueryAndCallback>
mPendingQuery;
/*TODO::监听wakeup支持*/
/*TODO::考虑固定分配connection给某业务*/
/*TODO::编写储存过程,替换现有的hashtable模拟方式,如循环使用jsonb_set以及 select value->k1, value->k2 from ...*/
/*TODO::编写储存过程,实现list*/
};
int main()
{
using std::chrono::system_clock;
AsyncPGClient asyncClient;
asyncClient.createConnection("192.168.12.1",
"5432", nullptr, nullptr,
"postgres",
"postgres",
"19870323",
8);
system_clock::time_point startTime =
system_clock::now();
auto nowTime =
time(NULL);
for (
int i =
0; i <
100000; i++
)
{
if(
false)
{
string test =
"INSERT INTO public.kv_data(key, value) VALUES (‘";
test += std::to_string(nowTime*
1000+
i);
test +=
"‘, ‘{\"hp\":100000}‘) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;";
asyncClient.postQuery(test);
}
else
{
asyncClient.postQuery("select * from public.kv_data where key=‘dd‘;");
}
}
asyncClient.postQuery("INSERT INTO public.kv_data(key, value) VALUES (‘dodo5‘, ‘{\"hp\":100000}‘) "
" ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", [](
const PGresult*
result){
cout <<
"fuck" <<
endl;
});
asyncClient.get(
"dd", [](
const string&
value){
cout <<
"get dd : " << value <<
endl;
});
asyncClient.set(
"dd",
"{\"hp\":456}", [](
bool isOK){
cout <<
"set dd : " << isOK <<
endl;
});
asyncClient.hget("heros:dodo",
"hp", [](
const string&
value){
cout <<
"hget heros:dodo:" << value <<
endl;
});
asyncClient.hset("heros:dodo",
"hp",
"{\"hp\":1}", [](
bool isOK){
cout <<
"hset heros:dodo:" << isOK <<
endl;
});
asyncClient.hmget("heros:dodo", {
"hp",
"money" }, [](
const unordered_map<
string,
string>&
kvs){
cout <<
"hmget:" <<
endl;
for (auto&
kv : kvs)
{
cout << kv.first <<
" : " << kv.second <<
endl;
}
});
asyncClient.hgetall("heros:dodo", [](
const unordered_map<
string,
string>&
kvs){
cout <<
"hgetall:" <<
endl;
for (auto&
kv : kvs)
{
cout << kv.first <<
" : " << kv.second <<
endl;
}
});
while (
true)
{
asyncClient.poll(1);
asyncClient.trySendPendingQuery();
if (asyncClient.pendingQueryNum() ==
0 && asyncClient.getWorkingQuery() ==
0)
{
break;
}
}
auto elapsed = system_clock::now() -
startTime;
cout <<
"cost :" << chrono::duration<
double>(elapsed).count() <<
"s" <<
endl;
cout <<
"enter any key exit" <<
endl;
cin.get();
return 0;
}
代码地址:https://github.com/IronsDu/accumulation-dev/blob/master/examples/Pgedis.cpp
PostgreSQL异步客户端(并模拟redis 数据结构)
标签: