Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ let outputMode = 'text' // default output mode
const instructions =
'You are a machine learning web application named "Hyperparam" running on a CLI terminal.'
+ '\nYou assist users with analyzing and exploring datasets, particularly in parquet format.'
+ ' The website and api are available at hyperparam.app.'
+ ' The website is available at hyperparam.app.'
+ ' The Hyperparam CLI tool can list and explore local parquet files.'
+ '\nYou are on a terminal and can only output: text, emojis, terminal colors, and terminal formatting.'
+ ' Don\'t add additional markdown or html formatting unless requested.'
+ ' Limited markdown formatting is available: inline code blocks.'
+ (process.stdout.isTTY ? ` The terminal width is ${process.stdout.columns} characters.` : '')

const colors = {
Expand Down
100 changes: 67 additions & 33 deletions bin/tools/parquetDataSource.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { parquetSchema } from 'hyparquet'
import { parquetPlan } from 'hyparquet/src/plan.js'
import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js'
import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet'
import { whereToParquetFilter } from './parquetFilter.js'

/**
* @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet'
* @import { AsyncDataSource } from 'squirreling'
* @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet'
* @import { AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling'
* @import { AsyncCells } from 'squirreling/src/types.js'
*/

Expand All @@ -20,39 +18,75 @@ import { whereToParquetFilter } from './parquetFilter.js'
export function parquetDataSource(file, metadata, compressors) {
return {
async *scan(hints) {
const options = {
file,
metadata,
compressors,
columns: hints?.columns,
// convert WHERE clause to parquet pushdown filter
filter: whereToParquetFilter(hints?.where),
filterStrict: false,
}
metadata ??= await parquetMetadataAsync(file)

// TODO: check that columns exist in parquet file
let { columns } = options
if (!columns?.length) {
const schema = parquetSchema(metadata)
columns = schema.children.map(col => col.element.name)
}
// Convert WHERE AST to hyparquet filter format
const whereFilter = hints?.where && whereToParquetFilter(hints.where)
/** @type {ParquetQueryFilter | undefined} */
const filter = hints?.where ? whereFilter : undefined
const filterApplied = !filter || whereFilter

const plan = parquetPlan(options)
for (const subplan of plan.groups) {
// Read row group
const rg = readRowGroup(options, plan, subplan)
// Transpose to materialized rows
const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object')
// Convert to AsyncRow generator
for (const row of rows) {
/** @type {AsyncCells} */
const cells = {}
for (const [key, value] of Object.entries(row)) {
cells[key] = () => Promise.resolve(value)
// Emit rows by row group
let groupStart = 0
let remainingLimit = hints?.limit ?? Infinity
for (const rowGroup of metadata.row_groups) {
const rowCount = Number(rowGroup.num_rows)

// Skip row groups by offset if where is fully applied
let safeOffset = 0
let safeLimit = rowCount
if (filterApplied) {
if (hints?.offset !== undefined && groupStart < hints.offset) {
safeOffset = Math.min(rowCount, hints.offset - groupStart)
}
yield { columns, cells }
safeLimit = Math.min(rowCount - safeOffset, remainingLimit)
if (safeLimit <= 0 && safeOffset < rowCount) break
}
for (let i = 0; i < safeOffset; i++) {
// yield empty rows
yield asyncRow({})
}
if (safeOffset === rowCount) {
groupStart += rowCount
continue
}

// Read objects from this row group
const data = await parquetReadObjects({
file,
metadata,
rowStart: groupStart + safeOffset,
rowEnd: groupStart + safeOffset + safeLimit,
columns: hints?.columns,
filter,
filterStrict: false,
compressors,
useOffsetIndex: true,
})

// Yield each row
for (const row of data) {
yield asyncRow(row)
}

remainingLimit -= data.length
groupStart += rowCount
}
},
}
}

/**
* Creates an async row accessor that wraps a plain JavaScript object
*
* @param {Record<string, SqlPrimitive>} obj - the plain object
* @returns {AsyncRow} a row accessor interface
*/
function asyncRow(obj) {
/** @type {AsyncCells} */
const cells = {}
for (const [key, value] of Object.entries(obj)) {
cells[key] = () => Promise.resolve(value)
}
return { columns: Object.keys(obj), cells }
}
22 changes: 12 additions & 10 deletions bin/tools/parquetSql.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { asyncBufferFromFile, parquetMetadataAsync } from 'hyparquet'
import { asyncBufferFromFile, asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import { collect, executeSql } from 'squirreling'
import { parquetDataSource } from './parquetDataSource.js'
Expand Down Expand Up @@ -27,9 +27,9 @@ export const parquetSql = {
parameters: {
type: 'object',
properties: {
filename: {
file: {
type: 'string',
description: 'The name of the parquet file to query.',
description: 'The parquet file to query either local file path or url.',
},
query: {
type: 'string',
Expand All @@ -40,16 +40,16 @@ export const parquetSql = {
description: 'Whether to truncate long string values in the results. If true (default), each string cell is limited to 1000 characters. If false, each string cell is limited to 10,000 characters.',
},
},
required: ['filename', 'query'],
required: ['file', 'query'],
},
},
/**
* @param {Record<string, unknown>} args
* @returns {Promise<string>}
*/
async handleToolCall({ filename, query, truncate = true }) {
if (typeof filename !== 'string') {
throw new Error('Expected filename to be a string')
async handleToolCall({ file, query, truncate = true }) {
if (typeof file !== 'string') {
throw new Error('Expected file to be a string')
}
if (typeof query !== 'string' || query.trim().length === 0) {
throw new Error('Query parameter must be a non-empty string')
Expand All @@ -59,9 +59,11 @@ export const parquetSql = {
const startTime = performance.now()

// Load parquet file and create data source
const file = await asyncBufferFromFile(filename)
const metadata = await parquetMetadataAsync(file)
const table = parquetDataSource(file, metadata, compressors)
const asyncBuffer = file.startsWith('http://') || file.startsWith('https://')
? await asyncBufferFromUrl({ url: file })
: await asyncBufferFromFile(file)
const metadata = await parquetMetadataAsync(asyncBuffer)
const table = parquetDataSource(asyncBuffer, metadata, compressors)

// Execute SQL query
const results = await collect(executeSql({ tables: { table }, query }))
Expand Down