Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions chdb-purego/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chdbpurego
import (
"os"
"os/exec"
"unsafe"

"github.com/ebitengine/purego"
)
Expand Down Expand Up @@ -35,6 +36,7 @@ func findLibrary() string {
}

var (
// old API
queryStable func(argc int, argv []string) *local_result
freeResult func(result *local_result)
queryStableV2 func(argc int, argv []string) *local_result_v2
Expand All @@ -47,6 +49,23 @@ var (
streamingResultNext func(conn *chdb_conn, result *chdb_streaming_result) *local_result_v2
streamingResultDestroy func(result *chdb_streaming_result)
streamingResultCancel func(conn *chdb_conn, result *chdb_streaming_result)

// new API
chdbConnect func(argc int, argv []*byte) *chdb_connection
chdbCloseConn func(conn *chdb_connection)
chdbQuery func(conn unsafe.Pointer, query string, format string) *chdb_result
chdbStreamQuery func(conn unsafe.Pointer, query string, format string) *chdb_result
chdbStreamFetchResult func(conn unsafe.Pointer, result *chdb_result) *chdb_result
chdbStreamCancelQuery func(conn *chdb_connection, result *chdb_result)
chdbDestroyQueryResult func(result *chdb_result)
chdbResultBuffer func(result *chdb_result) *byte
chdbResultLen func(result *chdb_result) uint //size_t
chdbResultElapsed func(result *chdb_result) float64 // double
chdbResultRowsRead func(result *chdb_result) uint64
chdbResultBytesRead func(result *chdb_result) uint64
chdbResultStorageRowsRead func(result *chdb_result) uint64
chdbResultStorageBytesRead func(result *chdb_result) uint64
chdbResultError func(result *chdb_result) string
)

func init() {
Expand All @@ -69,4 +88,21 @@ func init() {
purego.RegisterLibFunc(&streamingResultCancel, libchdb, "chdb_streaming_cancel_query")
purego.RegisterLibFunc(&streamingResultDestroy, libchdb, "chdb_destroy_result")

// new API
purego.RegisterLibFunc(&chdbConnect, libchdb, "chdb_connect")
purego.RegisterLibFunc(&chdbCloseConn, libchdb, "chdb_close_conn")
purego.RegisterLibFunc(&chdbQuery, libchdb, "chdb_query")
purego.RegisterLibFunc(&chdbStreamQuery, libchdb, "chdb_stream_query")
purego.RegisterLibFunc(&chdbStreamFetchResult, libchdb, "chdb_stream_fetch_result")
purego.RegisterLibFunc(&chdbStreamCancelQuery, libchdb, "chdb_stream_cancel_query")
purego.RegisterLibFunc(&chdbDestroyQueryResult, libchdb, "chdb_destroy_query_result")
purego.RegisterLibFunc(&chdbResultBuffer, libchdb, "chdb_result_buffer")
purego.RegisterLibFunc(&chdbResultLen, libchdb, "chdb_result_length")
purego.RegisterLibFunc(&chdbResultElapsed, libchdb, "chdb_result_elapsed")
purego.RegisterLibFunc(&chdbResultRowsRead, libchdb, "chdb_result_rows_read")
purego.RegisterLibFunc(&chdbResultBytesRead, libchdb, "chdb_result_bytes_read")
purego.RegisterLibFunc(&chdbResultStorageRowsRead, libchdb, "chdb_result_storage_rows_read")
purego.RegisterLibFunc(&chdbResultStorageBytesRead, libchdb, "chdb_result_storage_bytes_read")
purego.RegisterLibFunc(&chdbResultError, libchdb, "chdb_result_error")

}
87 changes: 40 additions & 47 deletions chdb-purego/chdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
)

type result struct {
localResv2 *local_result_v2
chdb_result *chdb_result
}

func newChdbResult(cRes *local_result_v2) ChdbResult {
func newChdbResult(cRes *chdb_result) ChdbResult {
res := &result{
localResv2: cRes,
chdb_result: cRes,
}
// runtime.SetFinalizer(res, res.Free)
return res
Expand All @@ -26,61 +26,66 @@ func newChdbResult(cRes *local_result_v2) ChdbResult {

// Buf implements ChdbResult.
func (c *result) Buf() []byte {
if c.localResv2 != nil {
if c.localResv2.buf != nil && c.localResv2.len > 0 {
return unsafe.Slice(c.localResv2.buf, c.localResv2.len)
if c.chdb_result != nil {
buf := chdbResultBuffer(c.chdb_result)
if buf != nil {
// Assuming we have a way to get the length of the buffer
// Thlis is a placeholder; replace with actual length retrieva logic
length := c.Len() // Replace with actual length
return unsafe.Slice(buf, length)
}

}
return nil
}

// BytesRead implements ChdbResult.
func (c *result) BytesRead() uint64 {
if c.localResv2 != nil {
return c.localResv2.bytes_read
if c.chdb_result != nil {
return chdbResultBytesRead(c.chdb_result)
}
return 0
}

// Elapsed implements ChdbResult.
func (c *result) Elapsed() float64 {
if c.localResv2 != nil {
return c.localResv2.elapsed
if c.chdb_result != nil {
return chdbResultElapsed(c.chdb_result)
}
return 0
}

// Error implements ChdbResult.
func (c *result) Error() error {
if c.localResv2 != nil {
if c.localResv2.error_message != nil {
return errors.New(ptrToGoString(c.localResv2.error_message))
if c.chdb_result != nil {
if s := chdbResultError(c.chdb_result); s != "" {
return errors.New(s)
}
}
return nil
}

// Free implements ChdbResult.
func (c *result) Free() {
if c.localResv2 != nil {
freeResultV2(c.localResv2)
c.localResv2 = nil
if c.chdb_result != nil {
chdbDestroyQueryResult(c.chdb_result)
c.chdb_result = nil
}

}

// Len implements ChdbResult.
func (c *result) Len() int {
if c.localResv2 != nil {
return int(c.localResv2.len)
if c.chdb_result != nil {
return int(chdbResultLen(c.chdb_result))
}
return 0
}

// RowsRead implements ChdbResult.
func (c *result) RowsRead() uint64 {
if c.localResv2 != nil {
return c.localResv2.rows_read
if c.chdb_result != nil {
return chdbResultRowsRead(c.chdb_result)
}
return 0
}
Expand All @@ -95,15 +100,10 @@ func (c *result) String() string {
}

type connection struct {
conn **chdb_conn
}

// CancelQuery implements ChdbConn.
func (c *connection) CancelQuery(query ChdbResult) (err error) {
panic("unimplemented")
conn *chdb_connection
}

func newChdbConn(conn **chdb_conn) ChdbConn {
func newChdbConn(conn *chdb_connection) ChdbConn {
c := &connection{
conn: conn,
}
Expand All @@ -114,28 +114,26 @@ func newChdbConn(conn **chdb_conn) ChdbConn {
// Close implements ChdbConn.
func (c *connection) Close() {
if c.conn != nil {
closeConn(c.conn)
chdbCloseConn(c.conn)
}
}

// Query implements ChdbConn.
func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult, err error) {

if c.conn == nil {
return nil, fmt.Errorf("invalid connection")
}

rawConn := *c.conn

res := queryConn(rawConn, queryStr, formatStr)
res := chdbQuery(c.conn.internal_data, queryStr, formatStr)
if res == nil {
// According to the C ABI of chDB v1.2.0, the C function query_stable_v2
// returns nil if the query returns no data. This is not an error. We
// will change this behavior in the future.
return newChdbResult(res), nil
}
if res.error_message != nil {
return nil, errors.New(ptrToGoString(res.error_message))
errMsg := chdbResultError(res)
if errMsg != "" {
return nil, errors.New(errMsg)
}

return newChdbResult(res), nil
Expand All @@ -148,28 +146,23 @@ func (c *connection) QueryStreaming(queryStr string, formatStr string) (result C
return nil, fmt.Errorf("invalid connection")
}

rawConn := *c.conn

res := queryConnStreaming(rawConn, queryStr, formatStr)
res := chdbStreamQuery(c.conn.internal_data, queryStr, formatStr)
if res == nil {
// According to the C ABI of chDB v1.2.0, the C function query_stable_v2
// returns nil if the query returns no data. This is not an error. We
// will change this behavior in the future.
return newStreamingResult(rawConn, res), nil
return newStreamingResult(c.conn, res), nil
}
if s := streamingResultError(res); s != nil {
return nil, errors.New(*s)
if s := chdbResultError(res); s != "" {
return nil, errors.New(s)
}

return newStreamingResult(rawConn, res), nil
return newStreamingResult(c.conn, res), nil
}

func (c *connection) Ready() bool {
if c.conn != nil {
deref := *c.conn
if deref != nil {
return deref.connected
}
return true
}
return false
}
Expand Down Expand Up @@ -221,15 +214,15 @@ func NewConnection(argc int, argv []string) (ChdbConn, error) {
// fmt.Println("arg: ", arg)
// }

var conn **chdb_conn
var conn *chdb_connection
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("C++ exception: %v", r)
}
}()
conn = connectChdb(len(new_argv), c_argv)
conn = chdbConnect(len(new_argv), c_argv)
}()

if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions chdb-purego/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package chdbpurego
import "errors"

type streamingResult struct {
curConn *chdb_conn
stream *chdb_streaming_result
curConn *chdb_connection
stream *chdb_result
curChunk ChdbResult
}

func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStreamResult {
func newStreamingResult(conn *chdb_connection, cRes *chdb_result) ChdbStreamResult {

// nextChunk := streamingResultNext(conn, cRes)
// if nextChunk == nil {
Expand All @@ -28,16 +28,19 @@ func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStream

// Error implements ChdbStreamResult.
func (c *streamingResult) Error() error {
if s := streamingResultError(c.stream); s != nil {
return errors.New(*s)
if s := chdbResultError(c.stream); s != "" {
return errors.New(s)
}
return nil
}

// Free implements ChdbStreamResult.
func (c *streamingResult) Free() {
streamingResultCancel(c.curConn, c.stream)
streamingResultDestroy(c.stream)
if c.curConn != nil && c.stream != nil {
chdbStreamCancelQuery(c.curConn, c.stream)
chdbDestroyQueryResult(c.stream)
}

c.stream = nil
if c.curChunk != nil {
c.curChunk.Free()
Expand All @@ -53,7 +56,7 @@ func (c *streamingResult) Cancel() {
// GetNext implements ChdbStreamResult.
func (c *streamingResult) GetNext() ChdbResult {
if c.curChunk == nil {
nextChunk := streamingResultNext(c.curConn, c.stream)
nextChunk := chdbStreamFetchResult(c.curConn.internal_data, c.stream)
if nextChunk == nil {
return nil
}
Expand All @@ -63,7 +66,7 @@ func (c *streamingResult) GetNext() ChdbResult {
// free the current chunk before getting the next one
c.curChunk.Free()
c.curChunk = nil
nextChunk := streamingResultNext(c.curConn, c.stream)
nextChunk := chdbStreamFetchResult(c.curConn.internal_data, c.stream)
if nextChunk == nil {
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions chdb-purego/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ type chdb_conn struct {
queue unsafe.Pointer
}

type chdb_connection struct {
internal_data unsafe.Pointer
}

type chdb_result struct {
internal_data unsafe.Pointer
}

type ChdbResult interface {
Buf() []byte
// String rapresentation of the the buffer
Expand Down
Loading