164 lines
3.7 KiB
Go
164 lines
3.7 KiB
Go
|
package clickhouse
|
||
|
|
||
|
import (
|
||
|
"database/sql/driver"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"reflect"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/kshvakov/clickhouse/lib/column"
|
||
|
"github.com/kshvakov/clickhouse/lib/data"
|
||
|
"github.com/kshvakov/clickhouse/lib/protocol"
|
||
|
)
|
||
|
|
||
|
type rows struct {
|
||
|
ch *clickhouse
|
||
|
err error
|
||
|
mutex sync.RWMutex
|
||
|
finish func()
|
||
|
offset int
|
||
|
block *data.Block
|
||
|
totals *data.Block
|
||
|
extremes *data.Block
|
||
|
stream chan *data.Block
|
||
|
columns []string
|
||
|
blockColumns []column.Column
|
||
|
}
|
||
|
|
||
|
func (rows *rows) Columns() []string {
|
||
|
return rows.columns
|
||
|
}
|
||
|
|
||
|
func (rows *rows) ColumnTypeScanType(idx int) reflect.Type {
|
||
|
return rows.blockColumns[idx].ScanType()
|
||
|
}
|
||
|
|
||
|
func (rows *rows) ColumnTypeDatabaseTypeName(idx int) string {
|
||
|
return rows.blockColumns[idx].CHType()
|
||
|
}
|
||
|
|
||
|
func (rows *rows) Next(dest []driver.Value) error {
|
||
|
if rows.block == nil || int(rows.block.NumRows) <= rows.offset {
|
||
|
switch block, ok := <-rows.stream; true {
|
||
|
case !ok:
|
||
|
if err := rows.error(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return io.EOF
|
||
|
default:
|
||
|
rows.block = block
|
||
|
rows.offset = 0
|
||
|
}
|
||
|
}
|
||
|
for i := range dest {
|
||
|
dest[i] = rows.block.Values[i][rows.offset]
|
||
|
}
|
||
|
rows.offset++
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (rows *rows) HasNextResultSet() bool {
|
||
|
return rows.totals != nil || rows.extremes != nil
|
||
|
}
|
||
|
|
||
|
func (rows *rows) NextResultSet() error {
|
||
|
switch {
|
||
|
case rows.totals != nil:
|
||
|
rows.block = rows.totals
|
||
|
rows.offset = 0
|
||
|
rows.totals = nil
|
||
|
case rows.extremes != nil:
|
||
|
rows.block = rows.extremes
|
||
|
rows.offset = 0
|
||
|
rows.extremes = nil
|
||
|
default:
|
||
|
return io.EOF
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (rows *rows) receiveData() error {
|
||
|
defer close(rows.stream)
|
||
|
var (
|
||
|
err error
|
||
|
packet uint64
|
||
|
progress *progress
|
||
|
profileInfo *profileInfo
|
||
|
)
|
||
|
for {
|
||
|
if packet, err = rows.ch.decoder.Uvarint(); err != nil {
|
||
|
return rows.setError(err)
|
||
|
}
|
||
|
switch packet {
|
||
|
case protocol.ServerException:
|
||
|
rows.ch.logf("[rows] <- exception")
|
||
|
return rows.setError(rows.ch.exception())
|
||
|
case protocol.ServerProgress:
|
||
|
if progress, err = rows.ch.progress(); err != nil {
|
||
|
return rows.setError(err)
|
||
|
}
|
||
|
rows.ch.logf("[rows] <- progress: rows=%d, bytes=%d, total rows=%d",
|
||
|
progress.rows,
|
||
|
progress.bytes,
|
||
|
progress.totalRows,
|
||
|
)
|
||
|
case protocol.ServerProfileInfo:
|
||
|
if profileInfo, err = rows.ch.profileInfo(); err != nil {
|
||
|
return rows.setError(err)
|
||
|
}
|
||
|
rows.ch.logf("[rows] <- profiling: rows=%d, bytes=%d, blocks=%d", profileInfo.rows, profileInfo.bytes, profileInfo.blocks)
|
||
|
case protocol.ServerData, protocol.ServerTotals, protocol.ServerExtremes:
|
||
|
var (
|
||
|
block *data.Block
|
||
|
begin = time.Now()
|
||
|
)
|
||
|
if block, err = rows.ch.readBlock(); err != nil {
|
||
|
return rows.setError(err)
|
||
|
}
|
||
|
rows.ch.logf("[rows] <- data: packet=%d, columns=%d, rows=%d, elapsed=%s", packet, block.NumColumns, block.NumRows, time.Since(begin))
|
||
|
if block.NumRows == 0 {
|
||
|
continue
|
||
|
}
|
||
|
switch packet {
|
||
|
case protocol.ServerData:
|
||
|
rows.stream <- block
|
||
|
case protocol.ServerTotals:
|
||
|
rows.totals = block
|
||
|
case protocol.ServerExtremes:
|
||
|
rows.extremes = block
|
||
|
}
|
||
|
case protocol.ServerEndOfStream:
|
||
|
rows.ch.logf("[rows] <- end of stream")
|
||
|
return nil
|
||
|
default:
|
||
|
rows.ch.conn.Close()
|
||
|
rows.ch.logf("[rows] unexpected packet [%d]", packet)
|
||
|
return rows.setError(fmt.Errorf("[rows] unexpected packet [%d] from server", packet))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (rows *rows) Close() error {
|
||
|
rows.ch.logf("[rows] close")
|
||
|
rows.columns = nil
|
||
|
for range rows.stream {
|
||
|
}
|
||
|
rows.finish()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (rows *rows) error() error {
|
||
|
rows.mutex.RLock()
|
||
|
defer rows.mutex.RUnlock()
|
||
|
return rows.err
|
||
|
}
|
||
|
|
||
|
func (rows *rows) setError(err error) error {
|
||
|
rows.mutex.Lock()
|
||
|
rows.err = err
|
||
|
rows.mutex.Unlock()
|
||
|
return err
|
||
|
}
|