-
-
Notifications
You must be signed in to change notification settings - Fork 29
ENH: use new columnar GetArrowStream if GDAL>=3.6 and pyarrow available #155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
cf092b2
13d4b61
4eaba0d
9389dc4
d1a1153
eb5943d
bfa8ae7
41be9cc
eabed97
5c039f8
74706f8
c643c5d
755b43b
fd21ad5
a67ec7f
f80278c
7759a47
fd6e175
dff16ee
7320702
26d0910
616f090
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ import math | |
import os | ||
import warnings | ||
|
||
from libc.stdint cimport uint8_t | ||
from libc.stdint cimport uint8_t, uintptr_t | ||
from libc.stdlib cimport malloc, free | ||
from libc.string cimport strlen | ||
from libc.math cimport isnan | ||
|
@@ -931,6 +931,147 @@ def ogr_read( | |
) | ||
|
||
|
||
def ogr_read_arrow( | ||
str path, | ||
object layer=None, | ||
object encoding=None, | ||
int read_geometry=True, | ||
int force_2d=False, | ||
object columns=None, | ||
int skip_features=0, | ||
int max_features=0, | ||
object where=None, | ||
tuple bbox=None, | ||
object fids=None, | ||
str sql=None, | ||
str sql_dialect=None, | ||
int return_fids=False, | ||
**kwargs): | ||
|
||
cdef int err = 0 | ||
cdef const char *path_c = NULL | ||
cdef const char *where_c = NULL | ||
cdef OGRDataSourceH ogr_dataset = NULL | ||
cdef OGRLayerH ogr_layer = NULL | ||
cdef int feature_count = 0 | ||
cdef double xmin, ymin, xmax, ymax | ||
cdef ArrowArrayStream stream | ||
cdef ArrowSchema schema | ||
|
||
path_b = path.encode('utf-8') | ||
path_c = path_b | ||
|
||
if fids is not None: | ||
if where is not None or bbox is not None or sql is not None or skip_features or max_features: | ||
raise ValueError( | ||
"cannot set both 'fids' and any of 'where', 'bbox', 'sql', " | ||
"'skip_features' or 'max_features'" | ||
) | ||
fids = np.asarray(fids, dtype=np.intc) | ||
|
||
if sql is not None and layer is not None: | ||
raise ValueError("'sql' paramater cannot be combined with 'layer'") | ||
|
||
ogr_dataset = ogr_open(path_c, 0, kwargs) | ||
try: | ||
if sql is None: | ||
# layer defaults to index 0 | ||
if layer is None: | ||
layer = 0 | ||
ogr_layer = get_ogr_layer(ogr_dataset, layer) | ||
else: | ||
ogr_layer = execute_sql(ogr_dataset, sql, sql_dialect) | ||
|
||
crs = get_crs(ogr_layer) | ||
|
||
# Encoding is derived from the user, from the dataset capabilities / type, | ||
# or from the system locale | ||
encoding = ( | ||
encoding | ||
or detect_encoding(ogr_dataset, ogr_layer) | ||
or locale.getpreferredencoding() | ||
) | ||
|
||
fields = get_fields(ogr_layer, encoding) | ||
|
||
if columns is not None: | ||
# Fields are matched exactly by name, duplicates are dropped. | ||
# Find index of each field into fields | ||
idx = np.intersect1d(fields[:,2], columns, return_indices=True)[1] | ||
fields = fields[idx, :] | ||
|
||
geometry_type = get_geometry_type(ogr_layer) | ||
|
||
geometry_name = get_string(OGR_L_GetGeometryColumn(ogr_layer)) | ||
|
||
if fids is not None: | ||
raise ValueError("reading by FID not supported for arrow") | ||
|
||
# Apply the attribute filter | ||
if where is not None and where != "": | ||
apply_where_filter(ogr_layer, where) | ||
|
||
# Apply the spatial filter | ||
if bbox is not None: | ||
apply_spatial_filter(ogr_layer, bbox) | ||
|
||
# Limit feature range to available range | ||
skip_features, max_features = validate_feature_range( | ||
ogr_layer, skip_features, max_features | ||
) | ||
|
||
# make sure layer is read from beginning | ||
OGR_L_ResetReading(ogr_layer) | ||
|
||
IF CTE_GDAL_VERSION >= (3, 6, 0): | ||
|
||
if not OGR_L_GetArrowStream(ogr_layer, &stream, NULL): | ||
raise RuntimeError("Failed to open ArrowArrayStream from Layer") | ||
|
||
ELSE: | ||
raise RuntimeError("Need GDAL>=3.6 for Arrow support") | ||
|
||
stream_ptr = <uintptr_t> &stream | ||
|
||
import pyarrow as pa | ||
table = pa.RecordBatchStreamReader._import_from_c(stream_ptr).read_all() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part is actually the main piece that is different from the existing |
||
|
||
# fid_data, geometries, field_data = get_features( | ||
# ogr_layer, | ||
# fields, | ||
# encoding, | ||
# read_geometry=read_geometry and geometry_type is not None, | ||
# force_2d=force_2d, | ||
# skip_features=skip_features, | ||
# max_features=max_features, | ||
# return_fids=return_fids | ||
# ) | ||
|
||
meta = { | ||
'crs': crs, | ||
'encoding': encoding, | ||
'fields': fields[:,2], # return only names | ||
'geometry_type': geometry_type, | ||
'geometry_name': geometry_name, | ||
} | ||
|
||
finally: | ||
pass | ||
if ogr_dataset != NULL: | ||
if sql is not None: | ||
GDALDatasetReleaseResultSet(ogr_dataset, ogr_layer) | ||
|
||
GDALClose(ogr_dataset) | ||
ogr_dataset = NULL | ||
|
||
return ( | ||
meta, | ||
table, | ||
None, #geometries, | ||
None, #field_data | ||
) | ||
|
||
|
||
def ogr_read_bounds( | ||
str path, | ||
object layer=None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// This file is an extract https://github.com/apache/arrow/blob/master/cpp/src/arrow/c/abi.h | ||
// WARNING: DO NOT MODIFY the content as it would break interoperability ! | ||
|
||
#pragma once | ||
|
||
/*! @cond Doxygen_Suppress */ | ||
|
||
#include <stdint.h> | ||
|
||
#ifdef __cplusplus | ||
extern "C" { | ||
#endif | ||
|
||
#define ARROW_FLAG_DICTIONARY_ORDERED 1 | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#define ARROW_FLAG_NULLABLE 2 | ||
#define ARROW_FLAG_MAP_KEYS_SORTED 4 | ||
|
||
struct ArrowSchema { | ||
// Array type description | ||
const char* format; | ||
const char* name; | ||
const char* metadata; | ||
int64_t flags; | ||
int64_t n_children; | ||
struct ArrowSchema** children; | ||
struct ArrowSchema* dictionary; | ||
|
||
// Release callback | ||
void (*release)(struct ArrowSchema*); | ||
// Opaque producer-specific data | ||
void* private_data; | ||
}; | ||
|
||
struct ArrowArray { | ||
// Array data description | ||
int64_t length; | ||
int64_t null_count; | ||
int64_t offset; | ||
int64_t n_buffers; | ||
int64_t n_children; | ||
const void** buffers; | ||
struct ArrowArray** children; | ||
struct ArrowArray* dictionary; | ||
|
||
// Release callback | ||
void (*release)(struct ArrowArray*); | ||
// Opaque producer-specific data | ||
void* private_data; | ||
}; | ||
// EXPERIMENTAL: C stream interface | ||
|
||
struct ArrowArrayStream { | ||
// Callback to get the stream type | ||
// (will be the same for all arrays in the stream). | ||
// | ||
// Return value: 0 if successful, an `errno`-compatible error code otherwise. | ||
// | ||
// If successful, the ArrowSchema must be released independently from the stream. | ||
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); | ||
|
||
// Callback to get the next array | ||
// (if no error and the array is released, the stream has ended) | ||
// | ||
// Return value: 0 if successful, an `errno`-compatible error code otherwise. | ||
// | ||
// If successful, the ArrowArray must be released independently from the stream. | ||
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); | ||
|
||
// Callback to get optional detailed error information. | ||
// This must only be called if the last stream operation failed | ||
// with a non-0 return code. | ||
// | ||
// Return value: pointer to a null-terminated character array describing | ||
// the last error, or NULL if no description is available. | ||
// | ||
// The returned pointer is only valid until the next operation on this stream | ||
// (including release). | ||
const char* (*get_last_error)(struct ArrowArrayStream*); | ||
|
||
// Release callback: release the stream's own resources. | ||
// Note that arrays returned by `get_next` must be individually released. | ||
void (*release)(struct ArrowArrayStream*); | ||
|
||
// Opaque producer-specific data | ||
void* private_data; | ||
}; | ||
|
||
#ifdef __cplusplus | ||
} | ||
#endif | ||
|
||
/*! @endcond */ |
Uh oh!
There was an error while loading. Please reload this page.