1 Star 0 Fork 0

大量/mORMot

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SynDBPostgres.pas 33.86 KB
一键复制 编辑 原始数据 按行查看 历史

/// PostgreSQL direct access classes for SynDB units (not DB.pas based)
// - this unit is a part of the freeware Synopse framework,
// licensed under a MPL/GPL/LGPL tri-license - see LICENSE.md
unit SynDBPostgres;
{
*****************************************************************************
Implementation of TSQLDB* for PostgreSQL using libpg
Features:
- fast, minimum memory allocation
- includes its own simple wrapper to the libpq native client
- perfect fit for our ORM (JSON results, bulk insert/update/delete)
- array binding for select statements (caller should use =ANY(?) etc.)
Limitations:
- works with PostgreSQL>=7.4 and (v3 protocol)
- consider creating the database with UTF8 collation
- notifications are not implemented
- Postgres level prepared cached statements works only for SQLs what starts
exactly with SELECT INSERT UPDATE DELETE VALUES and not contains ";"
- parameters parser will fails in case SQL contains comments with ? inside
(TODO - will be fixed)
- all query rows are returned at once, caller should care about pagination
(TODO - implement singleRowMode?)
Aim of this unit is to provide simple alternative to SynDBZeos for PostgreSQL
*****************************************************************************
}
interface
{$I Synopse.inc} // define HASINLINE CPU32 CPU64 OWNNORMTOUPPER
uses
{$ifdef MSWINDOWS}
Windows,
{$endif}
SysUtils,
SynCommons,
SynTable,
SynDB;
type
/// exception type associated to the native libpg Interface
ESQLDBPostgres = class(ESQLDBException);
/// connection properties which will implement an internal Thread-Safe
// connection pool
TSQLDBPostgresConnectionProperties = class(TSQLDBConnectionPropertiesThreadSafe)
private
fOids: TWordDynArray; // O(n) search in L1 cache - use SSE2 on FPC x86_64
fOidsFieldTypes: TSQLDBFieldTypeDynArray;
fOidsCount: integer;
protected
procedure GetForeignKeys; override;
/// fill mapping of standard OID
// - at runtime mapping can be defined using Oid2FieldType() method
// - OIDs defined in DB can be retrieved using query
// "select oid, typname from pg_type where typtype = 'b' order by oid"
procedure FillOidMapping; virtual;
public
/// initialize the properties
// - raise an exception in case libpg is not thead-safe
// - aDatabaseName can be a Connection URI - see
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
// - if aDatabaseName contains connection URI with password we recommend to repeat password
// in aPassword parameter to prevent logging it (see TSQLDBConnectionProperties.DatabaseNameSafe)
// - better to use environment variables and postgres config file for connection parameters
constructor Create(const aServerName, aDatabaseName, aUserID, aPassword: RawUTF8); override;
/// create a new connection
// - caller is responsible of freeing this instance
// - this overridden method will create an TSQLDBPostgresConnection instance
function NewConnection: TSQLDBConnection; override;
/// add or replace mapping of OID into TSQLDBFieldType
// - in case mapping for OID is not defined, returns ftUTF8
function Oid2FieldType(cOID: cardinal): TSQLDBFieldType; {$ifdef HASINLINE} inline; {$endif}
// add new (or override existed) OID to FieldType mapping
procedure MapOid(cOid: cardinal; fieldType: TSQLDBFieldType);
end;
/// implements a connection via the libpq access layer
TSQLDBPostgresConnection = class(TSQLDBConnectionThreadSafe)
protected
// prepared statement names = SHA-256 of its SQL
fPrepared: THash256DynArray; // O(n) fast search in L1 cache
fPreparedCount: integer;
// the associated low-level provider connection
fPGConn: pointer;
// fServerSettings: set of (ssByteAasHex);
// maintain fPrepared[] hash list to identify already cached
// - returns statement index in prepared cache array
function PrepareCached(const aSQL: RawUTF8; aParamCount: integer;
out aName: RawUTF8): integer;
/// direct execution of SQL statement what do not returns a result
// - statement should not contains parameters
// - raise an ESQLDBPostgres on error
procedure DirectExecSQL(const SQL: RawUTF8); overload;
/// direct execution of SQL statement what do not returns a result
// - overloaded method to return a single value e.g. from a SELECT
procedure DirectExecSQL(const SQL: RawUTF8; out Value: RawUTF8); overload;
/// query the pg_settings table for a given setting
function GetServerSetting(const Name: RawUTF8): RawUTF8;
public
/// connect to the specified server
// - should raise an ESQLDBPostgres on error
procedure Connect; override;
/// stop connection to the specified PostgreSQL database server
// - should raise an ESQLDBPostgres on error
procedure Disconnect; override;
/// return TRUE if Connect has been already successfully called
function IsConnected: boolean; override;
/// create a new statement instance
function NewStatement: TSQLDBStatement; override;
/// begin a Transaction for this connection
procedure StartTransaction; override;
/// commit changes of a Transaction for this connection
// - StartTransaction method must have been called before
procedure Commit; override;
/// discard changes of a Transaction for this connection
// - StartTransaction method must have been called before
procedure Rollback; override;
/// direct access to the associated PPGconn connection
property Direct: pointer read fPGConn;
/// how many prepared statements are currently cached for this connection
property PreparedCount: integer read fPreparedCount;
end;
/// implements a statement via a Postgres database connection
TSQLDBPostgresStatement = class(TSQLDBStatementWithParamsAndColumns)
protected
fPreparedStmtName: RawUTF8; // = SHA-256 of the SQL
fPreparedParamsCount: integer;
fRes: pointer;
fResStatus: integer;
// pointers to query parameters; initialized by Prepare, filled in Executeprepared
fPGParams: TPointerDynArray;
// 0 - text, 1 - binary; initialized by Prepare, filled in Executeprepared
fPGParamFormats: TIntegerDynArray;
// non zero for binary params
fPGparamLengths: TIntegerDynArray;
/// define the result columns name and content
procedure BindColumns;
/// raise an exception if Col is out of range according to fColumnCount
// or rowset is not initialized
procedure CheckColAndRowset(const Col: integer);
public
/// finalize the statement for a given connection
destructor Destroy; override;
/// Prepare an UTF-8 encoded SQL statement
// - parameters marked as ? will be bound later, before ExecutePrepared call
// - if ExpectResults is TRUE, then Step() and Column*() methods are available
// to retrieve the data rows
// - raise an ESQLDBPostgres on any error
procedure Prepare(const aSQL: RawUTF8; ExpectResults: boolean = False); overload; override;
/// Execute a prepared SQL statement
// - parameters marked as ? should have been already bound with Bind*() functions
// - this implementation will also handle bound array of values (if any)
// - this overridden method will log the SQL statement if sllSQL has been
// enabled in SynDBLog.Family.Level
// - raise an ESQLDBPostgres on any error
procedure ExecutePrepared; override;
/// gets a number of updates made by latest executed statement
function UpdateCount: integer; override;
/// Reset the previous prepared statement
// - this overridden implementation will reset all bindings and the cursor state
// - raise an ESQLDBPostgres on any error
procedure Reset; override;
/// Access the next or first row of data from the SQL Statement result
// - return true on success, with data ready to be retrieved by Column*() methods
// - return false if no more row is available (e.g. if the SQL statement
// is not a SELECT but an UPDATE or INSERT command)
// - if SeekFirst is TRUE, will put the cursor on the first row of results
// - raise an ESQLDBPostgres on any error
function Step(SeekFirst: boolean = False): boolean; override;
/// clear(fRes) when ISQLDBStatement is back in cache
procedure ReleaseRows; override;
/// return a Column integer value of the current Row, first Col is 0
function ColumnInt(Col: integer): int64; override;
/// returns TRUE if the column contains NULL
function ColumnNull(Col: integer): boolean; override;
/// return a Column floating point value of the current Row, first Col is 0
function ColumnDouble(Col: integer): double; override;
/// return a Column date and time value of the current Row, first Col is 0
function ColumnDateTime(Col: integer): TDateTime; override;
/// return a Column currency value of the current Row, first Col is 0
function ColumnCurrency(Col: integer): currency; override;
/// return a Column UTF-8 encoded text value of the current Row, first Col is 0
function ColumnUTF8(Col: integer): RawUTF8; override;
/// return a Column as a blob value of the current Row, first Col is 0
function ColumnBlob(Col: integer): RawByteString; override;
/// append all columns values of the current Row to a JSON stream
// - overriden method to avoid temporary memory allocation or conversion
procedure ColumnsToJSON(WR: TJSONWriter); override;
/// how many parameters founded during prepare stage
property PreparedParamsCount: integer read fPreparedParamsCount;
end;
var
/// allow to specify a libpq library file name to use
SynDBPostgresLibrary: TFileName;
implementation
uses
SynLog,
SynCrypto; // libpq requires named prepared statements = use SHA-256
{ *********** minimal access to libpq native Postgres client library }
const // see pg_type.h
BOOLOID = 16;
BYTEAOID = 17;
INT8OID = 20;
INT2OID = 21;
INT4OID = 23;
REGPROCOID = 24;
TEXTOID = 25;
OIDOID = 26;
FLOAT4OID = 700;
FLOAT8OID = 701;
ABSTIMEOID = 702;
CASHOID = 790;
DATEOID = 1082;
TIMEOID = 1083;
TIMESTAMPOID = 1114;
TIMESTAMPTZOID = 1184;
TIMETZOID = 1266;
NUMERICOID = 1700;
CHAROID = 18;
NAMEOID = 19;
INT2VECTOROID = 22;
TIDOID = 27;
XIDOID = 28;
CIDOID = 29;
OIDVECTOROID = 30;
JSONOID = 114;
XMLOID = 142;
PGNODETREEOID = 194;
PGDDLCOMMANDOID = 32;
POINTOID= 600;
LSEGOID = 601;
PATHOID = 602;
BOXOID = 603;
POLYGONOID = 604;
LINEOID = 628;
RELTIMEOID = 703;
TINTERVALOID = 704;
UNKNOWNOID = 705;
CIRCLEOID = 718;
MACADDROID = 829;
INETOID = 869;
CIDROID = 650;
INT2ARRAYOID = 1005;
INT4ARRAYOID = 1007;
TEXTARRAYOID= 1009;
OIDARRAYOID = 1028;
FLOAT4ARRAYOID = 1021;
ACLITEMOID = 1033;
CSTRINGARRAYOID = 1263;
BPCHAROID = 1042;
VARCHAROID = 1043;
INTERVALOID = 1186;
BITOID = 1560;
VARBITOID = 1562;
REFCURSOROID= 1790;
REGPROCEDUREOID = 2202;
REGOPEROID = 2203;
REGOPERATOROID = 2204;
REGCLASSOID = 2205;
REGTYPEOID = 2206;
REGROLEOID = 4096;
REGNAMESPACEOID = 4089;
REGTYPEARRAYOID = 2211;
UUIDOID = 2950;
LSNOID = 3220;
TSVECTOROID = 3614;
GTSVECTOROID = 3642;
TSQUERYOID = 3615;
REGCONFIGOID = 3734;
REGDICTIONARYOID = 3769;
JSONBOID = 3802;
INT4RANGEOID = 3904;
const
PGRES_EMPTY_QUERY = 0;
PGRES_COMMAND_OK = 1;
PGRES_TUPLES_OK = 2;
PGRES_COPY_OUT = 3;
PGRES_COPY_IN = 4;
PGRES_BAD_RESPONSE = 5;
PGRES_NONFATAL_ERROR = 6;
PGRES_FATAL_ERROR = 7;
CONNECTION_OK = 0;
CONNECTION_BAD = 1;
CONNECTION_STARTED = 2;
CONNECTION_MADE = 3;
CONNECTION_AWAITING_RESPONSE = 4;
CONNECTION_AUTH_OK = 5;
CONNECTION_SETENV = 6;
CONNECTION_SSL_STARTUP = 7;
CONNECTION_NEEDED = 8;
PGFMT_TEXT = 0;
PGFMT_BIN = 1;
type
PPGconn = type pointer;
PPGresult = type pointer;
PPPGresult = ^PPGresult;
PQnoticeProcessor = procedure(arg: pointer; message: PUTF8Char); cdecl;
/// direct access to the libpq native Postgres protocol 3 library
// - only the endpoints needed by this unit are imported
TSQLDBPostgresLib = class(TSQLDBLib)
public
LibVersion: function: integer; cdecl;
IsThreadSafe: function: integer; cdecl;
SetDBLogin: function(pghost, pgport, pgoptions, pgtty, dbName,
login, pwd: PUTF8Char): PPGconn; cdecl;
Status: function(conn: PPGconn): integer; cdecl;
Finish: procedure(conn: PPGconn); cdecl;
ResultStatus: function(res: PPGresult): integer; cdecl;
ResultErrorField: function(res: PPGresult; fieldcode: integer): PUTF8Char; cdecl;
ErrorMessage: function(conn: PPGconn): PUTF8Char; cdecl;
SetNoticeProcessor: function(conn: PPGconn; proc: PQnoticeProcessor;
arg: pointer): PQnoticeProcessor; cdecl;
Clear: procedure(res: PPGresult); cdecl;
Freemem: procedure(ptr: pointer); cdecl;
Exec: function(conn: PPGconn; query: PUTF8Char): PPGresult; cdecl;
Prepare: function(conn: PPGconn; stmtName, query: PUTF8Char; nParams: integer;
paramTypes: PCardinal): PPGresult; cdecl;
ExecPrepared: function(conn: PPGconn; stmtName: PUTF8Char; nParams: integer;
paramValues: PPchar; paramLengths, paramFormats: PInteger;
resultFormat: integer): PPGresult; cdecl;
ExecParams: function(conn: PPGconn; command: PUTF8Char; nParams: integer;
paramTypes: PCardinal; paramValues: PPchar; paramLengths, paramFormats: PInteger;
resultFormat: integer):PPGresult; cdecl;
nfields: function(res: PPGresult): integer; cdecl;
ntuples: function(res: PPGresult): integer; cdecl;
cmdTuples: function(res: PPGresult): PUTF8Char; cdecl;
fname: function(res: PPGresult; field_num: integer): PUTF8Char; cdecl;
ftype: function(res: PPGresult; field_num: integer): cardinal; cdecl;
GetValue: function(res: PPGresult; tup_num, field_num: integer): PUTF8Char; cdecl;
GetLength: function(res: PPGresult; tup_num, field_num: integer): integer; cdecl;
GetIsNull: function(res: PPGresult; tup_num, field_num: integer): integer; cdecl;
/// try to dynamically load the libpq library
// - raise ESQLDBPostgres if the expected library is not found
constructor Create;
/// just a wrapper around FastSetString + GetValue/GetLength
procedure GetRawUTF8(res: PPGresult; tup_num, field_num: integer;
var result: RawUTF8);
/// raise an exception on error and clean result
// - will set pRes to nil if passed
// - if andClear is true - will call always PQ.Clear(res)
procedure Check(conn: PPGconn; res: PPGresult;
pRes: PPPGresult = nil; andClear: boolean = true);
end;
const
PQ_ENTRIES: array[0..22] of PChar = (
'PQlibVersion', 'PQisthreadsafe', 'PQsetdbLogin', 'PQstatus', 'PQfinish',
'PQresultStatus', 'PQresultErrorField', 'PQerrorMessage', 'PQsetNoticeProcessor',
'PQclear', 'PQfreemem', 'PQexec', 'PQprepare', 'PQexecPrepared', 'PQexecParams',
'PQnfields', 'PQntuples', 'PQcmdTuples', 'PQfname', 'PQftype', 'PQgetvalue',
'PQgetlength', 'PQgetisnull');
var
PQ: TSQLDBPostgresLib = nil;
{ TSQLDBPostgresLib }
const
LIBNAME = {$ifdef MSWINDOWS}'libpq.dll'{$else}
{$ifdef darwin}'libpq.dylib'{$else}'libpq.so.5'{$endif}{$endif};
LIBNAME2 = {$ifdef MSWINDOWS}''{$else}
{$ifdef darwin}''{$else}'libpq.so.4'{$endif}{$endif};
constructor TSQLDBPostgresLib.Create;
var
P: PPointer;
i: PtrInt;
l2: TFileName;
begin
if LIBNAME2 <> '' then
l2 := ExeVersion.ProgramFilePath + LIBNAME2;
TryLoadLibrary([SynDBPostgresLibrary, ExeVersion.ProgramFilePath + LIBNAME,
l2, LIBNAME, LIBNAME2], ESQLDBPostgres);
P := @@LibVersion;
for i := 0 to High(PQ_ENTRIES) do
begin
P^ := GetProcAddress(fHandle, PQ_ENTRIES[i]);
if P^ = nil then
begin
FreeLibrary(fHandle);
fHandle := 0;
raise ESQLDBPostgres.CreateUTF8('Invalid %: missing %', [LIBNAME, PQ_ENTRIES[i]]);
end;
inc(P);
end;
end;
procedure TSQLDBPostgresLib.GetRawUTF8(res: PPGresult;
tup_num, field_num: integer; var result: RawUTF8);
begin
FastSetString(result, GetValue(res, tup_num, field_num),
GetLength(res, tup_num, field_num));
end;
procedure TSQLDBPostgresLib.Check(conn: PPGconn; res: PPGresult; pRes: PPPGresult;
andClear: boolean);
var
errMsg, errCode: PUTF8Char;
begin
if (res = nil) or // nil in case of very fatal error, out of emory for example
(ResultStatus(res) in [PGRES_BAD_RESPONSE, PGRES_NONFATAL_ERROR,
PGRES_FATAL_ERROR]) then
begin
errMsg := ErrorMessage(conn);
if res <> nil then
errCode := ResultErrorField(res, Ord('C'){PG_DIAG_SQLSTATE})
else
errCode := nil;
Clear(res);
if pRes <> nil then
pRes^ := nil;
raise ESQLDBPostgres.CreateUTF8('% PGERRCODE: %, %', [self, errCode, errMsg]);
end
else if andClear then
Clear(res);
end;
{ TSQLDBPostgresConnection }
function TSQLDBPostgresConnection.PrepareCached(const aSQL: RawUTF8; aParamCount: integer;
out aName: RawUTF8): integer;
var
dig: TSHA256Digest;
begin
dig := SHA256Digest(aSQL);
aName := SHA256DigestToString(dig);
result := Hash256Index(pointer(fPrepared), fPreparedCount, @dig);
if result >= 0 then
exit; // already prepared
PQ.Check(fPGConn, PQ.Prepare(fPGConn, pointer(aName), pointer(aSQL), aParamCount, nil));
result := fPreparedCount;
inc(fPreparedCount);
if result = length(fPrepared) then
SetLength(fPrepared, result + 32);
fPrepared[result] := dig;
end;
procedure TSQLDBPostgresConnection.DirectExecSQL(const SQL: RawUTF8);
begin
PQ.Check(fPGConn, PQ.Exec(fPGConn, pointer(SQL)));
end;
procedure TSQLDBPostgresConnection.DirectExecSQL(const SQL: RawUTF8; out Value: RawUTF8);
var
res: PPGresult;
begin
res := PQ.Exec(fPGConn, pointer(SQL));
PQ.Check(fPGConn, res, nil, {andclear=}false);
PQ.GetRawUTF8(res, 0, 0, Value);
PQ.Clear(res);
end;
function TSQLDBPostgresConnection.GetServerSetting(const Name: RawUTF8): RawUTF8;
var
sql: RawUTF8;
begin
FormatUTF8('select setting from pg_settings where name=''%''', [Name], sql);
DirectExecSQL(sql, result);
end;
// our conversion is faster than PQUnescapeByteA - which requires libpq 8.3+
// and calls malloc()
// https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c
// checking \x for hexadecimal encoding is what UnescapeByteA() does
// -> no need to ask server settings
// note: bytea_output is HEX by default (at least since PostgreSQL 9.0)
function BlobInPlaceDecode(P: PAnsiChar; PLen: integer): integer;
begin
if (P = nil) or (PLen <= 0) then
result := 0
else
if PWord(P)^ = ord('\') + ord('x') shl 8 then {ssByteAasHex in fServerSettings}
begin
result := (PLen - 2) shr 1; // skip trailing \x and compute number of bytes
if result > 0 then
HexToBinFast(P + 2, PByte(P), result); // in-place conversion
end
else
result := OctToBin(P, pointer(P)); // in-place conversion
end;
procedure SynLogNoticeProcessor({%H-}arg: Pointer; message: PUTF8Char); cdecl;
begin
SynDBLog.Add.Log(sllTrace, 'PGINFO: %', [message], TObject(arg));
end;
procedure DummyNoticeProcessor({%H-}arg: Pointer; message: PUTF8Char); cdecl;
begin
end;
procedure TSQLDBPostgresConnection.Connect;
var
log: ISynLog;
host, port: RawUtf8;
begin
log := SynDBLog.Enter(self, 'Connect');
Disconnect; // force fTrans=fError=fServer=fContext=nil
try
Split(Properties.ServerName, ':', host, port);
fPGConn := PQ.SetDBLogin(pointer(host), pointer(port), nil, nil,
pointer(Properties.DatabaseName), pointer(Properties.UserID),
pointer(Properties.PassWord));
if PQ.Status(fPGConn) = CONNECTION_BAD then
raise ESQLDBPostgres.CreateUTF8('Connection to database % failed [%]',
[Properties.DatabaseNameSafe, PQ.ErrorMessage(fPGConn)]);
// if GetServerSetting('bytea_output') = 'HEX' then
// include(fServerSettings, ssByteAasHex);
if log <> nil then
begin
PQ.SetNoticeProcessor(fPGConn, SynLogNoticeProcessor, pointer(self));
log.Log(sllDB, 'Connected to % % using % v%', [fProperties.ServerName,
fProperties.DatabaseNameSafe, PQ.fLibraryPath, PQ.LibVersion], self);
end
else
// to ensure no performance drop due to notice to console
PQ.SetNoticeProcessor(fPGConn, DummyNoticeProcessor, nil);
inherited Connect; // notify any re-connection
except
on E: Exception do
begin
if log <> nil then
log.Log(sllError, 'Connect: % on %', [E, Properties.DatabaseNameSafe], self);
Disconnect; // clean up on fail
raise;
end;
end;
end;
procedure TSQLDBPostgresConnection.Disconnect;
begin
try
inherited Disconnect;
finally
if fPGConn <> nil then
begin
PQ.Finish(fPGConn);
fPGConn := nil;
end;
end;
end;
function TSQLDBPostgresConnection.IsConnected: boolean;
begin
result := (fPGConn <> nil);
end;
function TSQLDBPostgresConnection.NewStatement: TSQLDBStatement;
begin
result := TSQLDBPostgresStatement.Create(self);
end;
procedure TSQLDBPostgresConnection.StartTransaction;
var
log: ISynLog;
begin
log := SynDBLog.Enter(self, 'StartTransaction');
if TransactionCount > 0 then
raise ESQLDBPostgres.CreateUTF8('Invalid %.StartTransaction: nested ' +
'transactions are not supported by the Postgres - use SAVEPOINT instead', [self]);
try
inherited StartTransaction;
DirectExecSQL('START TRANSACTION');
except
on E: Exception do
begin
if log <> nil then
log.Log(sllError, 'StartTransaction: % on %', [E, Properties.DatabaseNameSafe], self);
if fTransactionCount > 0 then
Dec(fTransactionCount);
raise;
end;
end;
end;
procedure TSQLDBPostgresConnection.Commit;
begin
inherited Commit;
try
DirectExecSQL('COMMIT');
except
inc(fTransactionCount); // the transaction is still active
raise;
end;
end;
procedure TSQLDBPostgresConnection.Rollback;
begin
inherited;
DirectExecSQL('ROLLBACK');
end;
{ TSQLDBPostgresConnectionProperties }
procedure TSQLDBPostgresConnectionProperties.GetForeignKeys;
begin
// TODO - how to get field we reference to? (currently consider this is "ID")
with Execute('SELECT' + ' ct.conname as foreign_key_name, ' +
' case when ct.condeferred then 1 else 0 end AS is_disabled, ' +
' (SELECT tc.relname from pg_class tc where tc.oid = ct.conrelid) || ''.'' || ' +
' (SELECT a.attname FROM pg_attribute a WHERE a.attnum = ct.conkey[1] AND a.attrelid = ct.conrelid) as from_ref, ' +
' (SELECT tc.relname from pg_class tc where tc.oid = ct.confrelid) || ''.id'' as referenced_object ' +
'FROM pg_constraint ct WHERE contype = ''f''', []) do
while Step do
fForeignKeys.Add(ColumnUTF8(2), ColumnUTF8(3));
end;
procedure TSQLDBPostgresConnectionProperties.FillOidMapping;
begin // see pg_type.h (most used first)
mapOid(INT4OID, ftInt64);
mapOid(INT8OID, ftInt64);
mapOid(TEXTOID, ftUTF8);
mapOid(FLOAT8OID, ftDouble);
mapOid(TIMESTAMPOID, ftDate);
mapOid(BYTEAOID, ftBlob);
mapOid(NUMERICOID, ftCurrency);// our ORM uses NUMERIC(19,4) for currency
mapOid(BOOLOID, ftInt64);
mapOid(INT2OID, ftInt64);
mapOid(CASHOID, ftCurrency);
mapOid(TIMESTAMPTZOID, ftDate);
mapOid(ABSTIMEOID, ftDate);
mapOid(DATEOID, ftDate);
mapOid(TIMEOID, ftDate);
mapOid(TIMETZOID, ftDate);
mapOid(REGPROCOID, ftInt64);
mapOid(OIDOID, ftInt64);
mapOid(FLOAT4OID, ftDouble);
end; // any unregistered OID will be handled as ftUTF8
constructor TSQLDBPostgresConnectionProperties.Create(
const aServerName, aDatabaseName, aUserID, aPassword: RawUTF8);
begin
GlobalLock;
try
if PQ = nil then
GarbageCollectorFreeAndNil(PQ, TSQLDBPostgresLib.Create);
finally
GlobalUnLock;
end;
if PQ.IsThreadSafe <> 1 then
raise ESQLDBPostgres.Create('libpq should be compiled in threadsafe mode');
fDBMS := dPostgreSQL;
FillOidMapping;
inherited Create(aServerName, aDatabaseName, aUserID, aPassWord);
// JSONDecodedPrepareToSQL will detect cPostgreBulkArray and set
// DecodedFieldTypesToUnnest -> fast bulk insert/delete/update
fBatchSendingAbilities := [cCreate, cDelete, cUpdate, cPostgreBulkArray];
// disable MultiInsert SQL and rely on cPostgreBulkArray process for cCreate
fOnBatchInsert := nil; // see TSQLRestStorageExternal.InternalBatchStop
end;
function TSQLDBPostgresConnectionProperties.NewConnection: TSQLDBConnection;
begin
result := TSQLDBPostgresConnection.Create(self);
end;
function TSQLDBPostgresConnectionProperties.Oid2FieldType(cOID: cardinal): TSQLDBFieldType;
var
i: PtrInt;
begin
if cOID <= 65535 then
begin
i := WordScanIndex(pointer(fOids), fOidsCount, cOID);
if i >= 0 then
result := fOidsFieldTypes[i]
else
result := ftUTF8;
end
else
result := ftUTF8;
end;
procedure TSQLDBPostgresConnectionProperties.MapOid(cOid: cardinal;
fieldType: TSQLDBFieldType);
var
i: PtrInt;
begin
if cOID > 65535 then
raise ESQLDBPostgres.CreateUTF8('Out of range %.MapOid(%)', [self, cOID]);
i := WordScanIndex(pointer(fOids), fOidsCount, cOID);
if i < 0 then
begin
i := FOidsCount;
inc(FOidsCount);
if i = length(FOids) then
begin
SetLength(fOids, i + 32);
SetLength(fOidsFieldTypes, i + 32);
end;
fOids[i] := cOid;
end;
fOidsFieldTypes[i] := fieldType // set or replace
end;
procedure TSQLDBPostgresStatement.BindColumns;
var
nCols, c: integer;
cName: RawUTF8;
begin
fColumn.Clear;
fColumn.ReHash;
nCols := PQ.nfields(fRes);
fColumn.Capacity := nCols;
for c := 0 to nCols - 1 do
begin
cName := PQ.fname(fRes, c);
with PSQLDBColumnProperty(fColumn.AddAndMakeUniqueName(cName))^ do
begin
ColumnAttr := PQ.ftype(fRes, c);
ColumnType := TSQLDBPostgresConnectionProperties(Connection.
Properties).Oid2FieldType(ColumnAttr);
end;
end;
end;
procedure TSQLDBPostgresStatement.CheckColAndRowset(const Col: integer);
begin
CheckCol(Col);
if (fRes = nil) or (fResStatus <> PGRES_TUPLES_OK) then
raise ESQLDBPostgres.CreateUTF8('%.Execute not called before Column*', [self]);
end;
destructor TSQLDBPostgresStatement.Destroy;
begin
try
Reset; // close result if any
finally
inherited;
end;
end;
// see https://www.postgresql.org/docs/9.3/libpq-exec.html
procedure TSQLDBPostgresStatement.Prepare(const aSQL: RawUTF8; ExpectResults: boolean);
begin
SQLLogBegin(sllDB);
if aSQL = '' then
raise ESQLDBPostgres.CreateUTF8('%.Prepare: empty statement', [self]);
inherited Prepare(aSQL, ExpectResults); // will strip last ;
fPreparedParamsCount := ReplaceParamsByNumbers(fSQL, fSQLPrepared, '$');
if (fPreparedParamsCount > 0) and (IdemPCharArray(pointer(fSQLPrepared),
['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'VALUES']) >= 0) then
begin // preparable
fCacheIndex := TSQLDBPostgresConnection(fConnection).PrepareCached(
fSQLPrepared, fPreparedParamsCount, fPreparedStmtName);
SQLLogEnd(' name=% cache=%', [fPreparedStmtName, fCacheIndex]);
end
else
SQLLogEnd;
SetLength(fPGParams, fPreparedParamsCount);
SetLength(fPGParamFormats, fPreparedParamsCount);
SetLength(fPGparamLengths, fPreparedParamsCount);
end;
procedure TSQLDBPostgresStatement.ExecutePrepared;
var
i: PtrInt;
p: PSQLDBParam;
c: TSQLDBPostgresConnection;
begin
SQLLogBegin(sllSQL);
if fSQLPrepared = '' then
raise ESQLDBPostgres.CreateUTF8('%.ExecutePrepared: Statement not prepared', [self]);
if fParamCount <> fPreparedParamsCount then
raise ESQLDBPostgres.CreateUTF8('%.ExecutePrepared: Query expects % parameters ' +
'but % bound', [self, fPreparedParamsCount, fParamCount]);
inherited ExecutePrepared;
for i := 0 to fParamCount - 1 do // set parameters as expected by PostgreSQL
begin
// mark parameter as textual by default, with no blob len
fPGParamFormats[i] := 0;
fPGparamLengths[i] := 0;
// convert parameter value as text stored in p^.VData
p := @fParams[i];
if p^.VArray <> nil then
begin
if not (p^.VType in [ftInt64, ftDouble, ftCurrency, ftDate, ftUTF8]) then
raise ESQLDBPostgres.CreateUTF8('%.ExecutePrepared: Invalid array type % ' +
'on bound parameter #%', [Self, ToText(p^.VType)^, i]);
p^.VData := BoundArrayToJSONArray(p^.VArray);
end
else
begin
case p^.VType of
ftNull:
p^.VData := '';
ftInt64:
// use SwapEndian + binary ?
Int64ToUtf8(p^.VInt64, RawUTF8(p^.VData));
ftCurrency:
Curr64ToStr(p^.VInt64, RawUTF8(p^.VData));
ftDouble:
DoubleToStr(PDouble(@p^.VInt64)^, RawUTF8(p^.VData));
ftDate:
// Postgres expects space instead of T in ISO8601 expanded format
p^.VData := DateTimeToIso8601(
PDateTime(@p^.VInt64)^, true, ' ', fForceDateWithMS);
ftUTF8:
; // text already in p^.VData
ftBlob:
begin
fPGParamFormats[i] := 1; // binary
fPGparamLengths[i] := length(p^.VData);
end;
else
raise ESQLDBPostgres.CreateUTF8('%.ExecutePrepared: cannot bind ' +
'parameter #% of type %', [self, i, ToText(p^.VType)^]);
end;
end;
fPGParams[i] := pointer(p^.VData);
end;
c := TSQLDBPostgresConnection(Connection);
if fPreparedStmtName <> '' then
fRes := PQ.ExecPrepared(c.fPGConn, pointer(fPreparedStmtName), fPreparedParamsCount,
pointer(fPGParams), pointer(fPGparamLengths), pointer(fPGParamFormats), PGFMT_TEXT)
else if fPreparedParamsCount = 0 then
// PQexec handles multiple SQL commands
fRes := PQ.Exec(c.fPGConn, pointer(fSQLPrepared)) else
fRes := PQ.ExecParams(c.fPGConn, pointer(fSQLPrepared), fPreparedParamsCount, nil,
pointer(fPGParams), pointer(fPGparamLengths), pointer(fPGParamFormats), PGFMT_TEXT);
PQ.Check(c.fPGConn, fRes, @fRes, {forceClean=}false);
fResStatus := PQ.ResultStatus(fRes);
if fExpectResults then
begin
if fResStatus <> PGRES_TUPLES_OK then
begin // paranoid check
PQ.Clear(fRes);
fRes := nil;
raise ESQLDBPostgres.CreateUTF8('%.ExecutePrepared: result expected but ' +
'statement did not return tuples', [self]);
end;
fTotalRowsRetrieved := PQ.ntuples(fRes);
fCurrentRow := -1;
if fColumn.Count = 0 then // if columns exist then statement is already cached
BindColumns;
SQLLogEnd(' rows=%', [fTotalRowsRetrieved]);
end
else
SQLLogEnd;
end;
function TSQLDBPostgresStatement.UpdateCount: integer;
begin
result := GetCardinalDef(PQ.cmdTuples(fRes), 0);
end;
procedure TSQLDBPostgresStatement.Reset;
begin
ReleaseRows;
fResStatus := PGRES_EMPTY_QUERY;
inherited Reset;
end;
function TSQLDBPostgresStatement.Step(SeekFirst: boolean): boolean;
begin
if (fRes = nil) or (fResStatus <> PGRES_TUPLES_OK) then
raise ESQLDBPostgres.CreateUTF8('%.Execute should be called before Step', [self]);
if SeekFirst then
fCurrentRow := -1;
result := fCurrentRow + 1 < fTotalRowsRetrieved;
if not result then
exit;
inc(fCurrentRow);
end;
procedure TSQLDBPostgresStatement.ReleaseRows;
begin
if fRes <> nil then
begin
PQ.clear(fRes);
fRes := nil;
end;
inherited ReleaseRows;
end;
function TSQLDBPostgresStatement.ColumnInt(Col: integer): int64;
begin
CheckColAndRowset(Col);
result := GetInt64(PQ.GetValue(fRes, fCurrentRow, Col));
end;
function TSQLDBPostgresStatement.ColumnNull(Col: integer): boolean;
begin
CheckColAndRowset(Col);
result := (PQ.GetIsNull(fRes, fCurrentRow, Col) = 1);
end;
function TSQLDBPostgresStatement.ColumnDouble(Col: integer): double;
begin
CheckColAndRowset(Col);
result := GetExtended(PQ.GetValue(fRes, fCurrentRow, Col));
end;
function TSQLDBPostgresStatement.ColumnDateTime(Col: integer): TDateTime;
begin
CheckColAndRowset(Col);
Iso8601ToDateTimePUTF8CharVar(PQ.GetValue(fRes, fCurrentRow, Col),
PQ.GetLength(fRes, fCurrentRow, Col), result);
end;
function TSQLDBPostgresStatement.ColumnCurrency(Col: integer): currency;
begin
CheckColAndRowset(Col);
PInt64(@result)^ := StrToCurr64(PQ.GetValue(fRes, fCurrentRow, Col));
end;
function TSQLDBPostgresStatement.ColumnUTF8(Col: integer): RawUTF8;
begin
CheckColAndRowset(Col);
PQ.GetRawUTF8(fRes, fCurrentRow, Col, result);
end;
function TSQLDBPostgresStatement.ColumnBlob(Col: integer): RawByteString;
var
P: PAnsiChar;
begin // PGFMT_TEXT was used -> need to convert into binary
CheckColAndRowset(Col);
P := pointer(PQ.GetValue(fRes, fCurrentRow, Col));
SetString(result, P, BlobInPlaceDecode(P, PQ.GetLength(fRes, fCurrentRow, col)));
end;
procedure TSQLDBPostgresStatement.ColumnsToJSON(WR: TJSONWriter);
var
col: integer;
P: pointer;
begin
if (fRes = nil) or (fResStatus <> PGRES_TUPLES_OK) or (fCurrentRow < 0) then
raise ESQLDBPostgres.CreateUTF8('%.ColumnToJSON unexpected', [self]);
if WR.Expand then
WR.Add('{');
for col := 0 to fColumnCount - 1 do
with fColumns[col] do
begin
if WR.Expand then
WR.AddFieldName(ColumnName); // add '"ColumnName":'
if PQ.GetIsNull(fRes, fCurrentRow, col) = 1 then
WR.AddShort('null')
else
begin
P := PQ.GetValue(fRes, fCurrentRow, col);
case ColumnType of
ftNull:
WR.AddShort('null');
ftInt64, ftDouble, ftCurrency:
WR.AddNoJSONEscape(P, PQ.GetLength(fRes, fCurrentRow, col));
ftUTF8:
if (ColumnAttr = JSONOID) or (ColumnAttr = JSONBOID) then
WR.AddNoJSONEscape(P, PQ.GetLength(fRes, fCurrentRow, col))
else
begin
WR.Add('"');
WR.AddJSONEscape(P);
WR.Add('"');
end;
ftDate:
begin
WR.Add('"');
if (PQ.GetLength(fRes, fCurrentRow, col) > 10) and (PAnsiChar(P)[10] = ' ') then
PAnsiChar(P)[10] := 'T'; // ensure strict ISO-8601 encoding
WR.AddJSONEscape(P);
WR.Add('"');
end;
ftBlob:
if fForceBlobAsNull then
WR.AddShort('null')
else
WR.WrBase64(P, BlobInPlaceDecode(P,
PQ.GetLength(fRes, fCurrentRow, col)), {withmagic=}true);
else
raise ESQLDBPostgres.CreateUTF8('%.ColumnsToJSON: %?', [self, ToText(ColumnType)^]);
end;
end;
WR.Add(',');
end;
WR.CancelLastComma; // cancel last ','
if WR.Expand then
WR.Add('}');
end;
initialization
TSQLDBPostgresConnectionProperties.RegisterClassNameForDefinition;
end.
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wzdlsoft/mORMot.git
[email protected]:wzdlsoft/mORMot.git
wzdlsoft
mORMot
mORMot
master

搜索帮助