2025-05-16 18:00:22 +04:00

55 lines
2.0 KiB
Python

from typing import Any, Dict, List, Union
import pyarrow as pa
def from_list(values: Dict[str, Any], schema=None) -> pa.Table:
tb = pa.Table.from_pylist(values, schema=schema)
return tb
permitted_types = {
'integer': pa.int32(),
'float': pa.float32(),
'date': pa.timestamp('ms'),
'string': pa.string(),
'categorical': pa.string(),
}
def convert_pyarrow_schema_for_atlas(schema: pa.Schema) -> pa.Schema:
"""
Convert a pyarrow schema to one with types that match the subset of types supported by Atlas for upload.
"""
types = {}
whitelist = {}
for field in schema:
if field.name.startswith('_'):
# Underscore fields are private to Atlas and will be handled with their own logic.
if not field.name in {"_embeddings"}:
raise ValueError(f"Underscore fields are reserved for Atlas internal use: {field.name}")
whitelist[field.name] = field.type
elif pa.types.is_boolean(field.type):
raise TypeError(f"Boolean type not supported: {field.name}")
elif pa.types.is_list(field.type):
raise TypeError(f"List types not supported: {field.name}")
elif pa.types.is_struct(field.type):
raise TypeError(f"Struct types not supported: {field.name}")
elif pa.types.is_dictionary(field.type):
types[field.name] = 'categorical'
elif pa.types.is_string(field.type):
types[field.name] = 'string'
elif pa.types.is_integer(field.type):
types[field.name] = 'integer'
elif pa.types.is_floating(field.type):
types[field.name] = 'float'
elif pa.types.is_timestamp(field.type):
types[field.name] = 'date'
elif pa.types.is_temporal(field.type):
types[field.name] = 'date'
else:
raise TypeError(f"Unknown type: {field.name} {field.type}")
usertypes = {k: permitted_types[v] for k, v in types.items()}
return pa.schema({**usertypes, **whitelist})