# taibai-python **Repository Path**: coraldane/taibai-python ## Basic Information - **Project Name**: taibai-python - **Description**: 为taibai数据库编写的python驱动 - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-06-17 - **Last Updated**: 2026-06-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # taibai-python Taibai 列式数据库的官方 **asyncio** Python 驱动。零第三方依赖,仅使用标准库 (`asyncio`、`struct`、`ssl`、`zlib`、`json`)。 驱动实现了 Taibai 线协议的完整能力:连接与认证、文本/二进制查询结果、预处理语句、 事务、flush、列式批量导入、pipeline、变更订阅,以及全部 19 种 wire 数据类型。 ## 安装 ```bash pip install -e . ``` 要求 Python >= 3.10。 ## 快速开始 ```python import asyncio from taibai import Client async def main(): client = await Client.connect("127.0.0.1", 5151, user="root", password="taibai", database="main") result = await client.query("SELECT 1 AS x, 'hi' AS s") print(result.dicts()) # [{'x': 1, 's': 'hi'}] print(result.columns) # ['x', 's'] await client.close() asyncio.run(main()) ``` `Client.connect(...)` 默认自动登录(可用 `auto_login=False` 关闭)。 `Client` 也是异步上下文管理器: ```python async with await Client.connect(database="demo") as client: rows = await client.query("SELECT * FROM ticks LIMIT 10") ``` ## 功能矩阵 | 能力 | API | | --- | --- | | 连接 / 登录(含时区) | `Client.connect`, `client.login` | | 健康检查 | `client.ping` | | 查询(文本/二进制结果) | `client.query` | | 执行(返回影响行数) | `client.execute` | | 预处理语句 | `client.prepare`, `execute_prepared`, `execute_prepared_batch`, `close_statement` | | 事务 | `client.begin/commit/rollback`, `async with client.transaction()` | | flush 缓冲区 | `client.flush` | | 列式批量写入(单次) | `client.bulk_insert`, `bulk_insert_columnar` | | 高吞吐批量导入(pipeline) | `BulkImporter` | | 管道写入 | `start_pipeline / pipeline_bulk_insert_columnar / drain_pipeline / stop_pipeline` | | 变更订阅 | `subscribe_changes`, `recv_change_event`, `ack_change_consumer` | | 连接池 | `Pool` | | TLS | `TlsConfig` | | 压缩 | 连接级 `compress=True`(zlib,自动) | ## 数据类型 支持全部 19 种 wire 类型,编号与服务端一致: `Bool, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Decimal, Date, Time, Timestamp, Interval, VarChar, Blob, Vector`。 - 固定宽度类型使用 little-endian。 - `VarChar`/`Blob`/`Vector` 使用 4 字节长度前缀。 - 定标 `Decimal`(scale>0)在文本结果中为字符串,驱动会还原为放大整数; 二进制结果中按 i64 读取(与服务端一致)。 - `Vector` 为 `list[float]`(f32)。 预处理参数可直接传 Python 原生值(自动推断类型),或用 `ScalarValue(DataType.X, value)` 精确指定类型;`None` 表示 NULL。 ## 协议要点 - 帧格式:`magic(0x01,0xDB) | msg_type(u8) | payload_len(u32 LE) | payload`。 - 默认最大消息 64 MiB。 - payload 以 `0x78` 开头时按 zlib 解压。 - 登录后服务端会对查询返回二进制列式结果(`binary_results=True`,默认开启)。 - `OK` 响应前 8 字节为 `u64 rows_affected`,可能附带 `MXTP`/`MXTM` 计时尾部。 ## 批量导入 ```python from taibai import BulkImporter, ColumnDef, DataType columns = [ColumnDef("symbol", DataType.VarChar), ColumnDef("price", DataType.Float64)] importer = BulkImporter(client, "ticks", columns, pipeline_depth=8, disable_wal=True) await importer.start() await importer.insert_batch([["AAPL", "MSFT"], [191.2, 420.1]]) # 按列提供数据 stats = await importer.finish() print(stats.rows_per_sec) ``` ## 变更订阅 订阅连接进入流模式,不应再复用为普通查询连接: ```python sub = await Client.connect(database="demo") await sub.subscribe_changes(database="demo", table="ticks", include_recent=True) event = await sub.recv_change_event() # dict(JSON) await sub.ack_change_consumer("grp", event["sequence"]) ``` ## 示例 见 `examples/`:`basic_query.py`、`prepared_and_txn.py`、`bulk_import_and_changes.py`。 ## 测试 ```bash pip install -e ".[dev]" pytest ``` 单元测试覆盖帧编解码、类型编解码与文本/二进制结果往返;集成测试需要运行中的服务端。 ## 许可证 MIT OR Apache-2.0