#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Input classes for ETL via GDAL OGR.
#
# Author: Just van den Broecke
#
import subprocess
from stetl.component import Config
from stetl.util import Util, gdal, ogr
from stetl.input import Input
from stetl.packet import FORMAT
log = Util.get_log('ogrinput')
[docs]class OgrInput(Input):
"""
Direct GDAL OGR input via Python OGR wrapper. Via the Python API http://gdal.org/python
an OGR data source is accessed and from each layer the Features are read.
Each Layer corresponds to a "doc", so for multi-layer sources the 'end-of-doc' flag is
set after a Layer has been read.
This input can read almost any geospatial dataformat. One can use the features directly
in a Stetl Filter or use a converter to e.g. convert to GeoJSON structures.
produces=FORMAT.ogr_feature or FORMAT.ogr_feature_array (all features)
"""
# Start attribute config meta
# Applying Decorator pattern with the Config class to provide
# read-only config values from the configured properties.
@Config(ptype=str, default=None, required=True)
[docs] def data_source(self):
"""
String denoting the OGR datasource. Usually a path to a file like "path/rivers.shp" or connection string
to PostgreSQL like "PG: host=localhost dbname='rivers' user='postgres'".
Required: True
Default: None
"""
pass
@Config(ptype=str, default=None, required=False)
[docs] def source_format(self):
"""
Instructs GDAL to use driver by that name to open datasource. Not required for
many standard formats that are self-describing like ESRI Shapefile.
Examples: 'PostgreSQL', 'GeoJSON' etc
Required: False
Default: None
"""
pass
@Config(ptype=dict, default=None, required=False)
[docs] def source_options(self):
"""
Custom datasource-specific options. Used in gdal.SetConfigOption().
Type: dictionary
Required: False
Default: None
"""
pass
@Config(ptype=str, default=None, required=False)
[docs] def sql(self):
"""
String with SQL query. Mandatory for PostgreSQL OGR source.
Required: False (True for PostgreSQL OGR source)
Default: None
"""
pass
# End attribute config meta
# Constructor
def __init__(self, configdict, section):
Input.__init__(self, configdict, section, produces=[FORMAT.ogr_feature, FORMAT.ogr_feature_array])
def init(self):
self.ogr = ogr
# http://trac.osgeo.org/gdal/wiki/PythonGotchas
self.gdal = gdal
self.gdal.UseExceptions()
log.info("Using GDAL/OGR version: %d" % int(gdal.VersionInfo('VERSION_NUM')))
# GDAL error handler function
# http://pcjericks.github.io/py-gdalogr-cookbook/gdal_general.html
def gdal_error_handler(err_class, err_num, err_msg):
err_type = {
gdal.CE_None: 'None',
gdal.CE_Debug: 'Debug',
gdal.CE_Warning: 'Warning',
gdal.CE_Failure: 'Failure',
gdal.CE_Fatal: 'Fatal'
}
err_msg = err_msg.replace('\n', ' ')
err_class = err_type.get(err_class, 'None')
log.error('Error Number: %s, Type: %s, Msg: %s' % (err_num, err_class, err_msg))
# install error handler
self.gdal.PushErrorHandler(gdal_error_handler)
# Raise a dummy error for testing
# self.gdal.Error(1, 2, 'test error')
if self.source_options:
for k in self.source_options:
self.gdal.SetConfigOption(k, self.source_options[k])
# Open OGR data source in read-only mode.
if self.source_format:
self.data_source_p = ogr.GetDriverByName(self.source_format).Open(self.data_source, 0)
else:
self.data_source_p = self.ogr.Open(self.data_source, 0)
# Report failure if failed
if self.data_source_p is None:
log.error("Cannot open OGR datasource: %s with the following drivers." % self.data_source)
for iDriver in range(self.ogr.GetDriverCount()):
log.info(" -> " + self.ogr.GetDriver(iDriver).GetName())
raise Exception()
else:
# Open ok: initialize
self.layer = None
if self.sql:
self.layer_count = 1
self.layer_idx = -1
else:
self.layer_count = self.data_source_p.GetLayerCount()
self.layer_idx = 0
log.info("Opened OGR source ok: %s layer count=%d" % (self.data_source, self.layer_count))
def read(self, packet):
if not self.data_source_p:
log.info("End reading from: %s" % self.data_source)
return packet
if self.layer is None:
if self.sql and self.layer_idx == -1:
# PostgreSQL: Layer is gotten via Query
# http://trac.osgeo.org/postgis/wiki/UsersWikiOGR
self.layer = self.data_source_p.ExecuteSQL(self.sql)
self.layer_idx = 0
elif self.layer_idx < self.layer_count:
self.layer = self.data_source_p.GetLayer(self.layer_idx)
self.layer_idx += 1
if self.layer is None:
log.error("Could not fetch layer %d" % 0)
raise Exception()
log.info("Start reading from OGR Source: %s, Layer: %s" % (self.data_source, self.layer.GetName()))
else:
# No more Layers left: cleanup
packet.set_end_of_stream()
log.info("Closing OGR source: %s" % self.data_source)
# Destroy not required anymore: http://trac.osgeo.org/gdal/wiki/PythonGotchas
# self.data_source_p.Destroy()
self.data_source_p = None
return packet
# Return all features from Layer (ogr_feature_array) or next feature (ogr_feature)
if self.output_format == FORMAT.ogr_feature_array:
# Assemble all features
features = list()
for feature in self.layer:
features.append(feature)
packet.data = features
log.info("End reading all features from Layer: %s count=%d" % (self.layer.GetName(), len(features)))
packet.set_end_of_doc()
self.layer = None
else:
# Next feature
feature = self.layer.GetNextFeature()
if feature:
packet.data = feature
else:
log.info("End reading from Layer: %s" % self.layer.GetName())
packet.set_end_of_doc()
self.layer = None
return packet
[docs]class OgrPostgisInput(Input):
"""
Input from PostGIS via ogr2ogr command. For now hardcoded to produce an ogr GML line stream.
OgrInput may be a better alternative.
Alternatives: either stetl.input.PostgresqlInput or stetl.input.OgrInput.
produces=FORMAT.xml_line_stream
"""
# TODO make this template configurable so we can have generic ogr2ogr input....
pg_conn_tmpl = "PG:host=%s dbname=%s active_schema=%s user=%s password=%s port=%s"
cmd_tmpl = 'ogr2ogr|-t_srs|%s|-s_srs|%s|-f|GML|%s|-dsco|FORMAT=%s|-lco|DIM=%s|%s|-SQL|%s|-nln|%s|%s'
# Constructor
def __init__(self, configdict, section):
Input.__init__(self, configdict, section, produces=FORMAT.xml_line_stream)
def init(self):
self.ogr_process = None
self.eof_stdout = False
self.eof_stderr = False
in_pg_host = self.cfg.get('in_pg_host', 'localhost')
in_pg_db = self.cfg.get('in_pg_db')
in_pg_schema = self.cfg.get('in_pg_schema', 'public')
in_pg_user = self.cfg.get('in_pg_user', 'postgres')
in_pg_password = self.cfg.get('in_pg_password', 'postgres')
in_pg_port = self.cfg.get('in_pg_port', '5432')
in_srs = self.cfg.get('in_srs')
in_pg_sql = self.cfg.get('in_pg_sql')
out_srs = self.cfg.get('out_srs')
out_file = '/vsistdout/'
out_gml_format = self.cfg.get('out_gml_format')
out_dimension = self.cfg.get('out_dimension', '2')
out_layer_name = self.cfg.get('out_layer_name')
out_geotype = self.cfg.get('out_geotype', '')
#
# Build ogr2ogr command line
#
# PostGIS PG: options
self.pg = OgrPostgisInput.pg_conn_tmpl % (
in_pg_host, in_pg_db, in_pg_schema, in_pg_user, in_pg_password, in_pg_port)
# Entire ogr2ogr command line
self.cmd = OgrPostgisInput.cmd_tmpl % (
out_srs, in_srs, out_file, out_gml_format, out_dimension, self.pg, in_pg_sql, out_layer_name, out_geotype)
# Make array to make it easy for Popen with quotes etc
self.cmd = self.cmd.split('|')
def exec_cmd(self):
log.info("start ogr2ogr cmd = %s" % repr(self.cmd))
self.ogr_process = subprocess.Popen(self.cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
err_line = self.readline_err()
if err_line:
log.warning('ogr2ogr: %s ' % err_line)
# exit_code = self.ogr_process.returncode
def readline(self):
if self.eof_stdout is True:
return None
line = self.ogr_process.stdout.readline()
if not line:
self.eof_stdout = True
log.info("EOF stdout")
return None
line = line.decode('utf-8')
return line
def readline_err(self):
if self.eof_stderr is True:
return None
line = self.ogr_process.stderr.readline()
if not line:
self.eof_stderr = True
log.info("EOF stderr")
return None
return line
def read(self, packet):
if packet.is_end_of_stream():
return packet
# First time here : spawn the ogr2ogr command
if self.ogr_process is None:
# New splitter for each layer
self.exec_cmd()
packet.data = self.readline()
if not packet.data:
err_line = self.readline_err()
if err_line:
log.warning('ogr2ogr: %s ' % err_line)
packet.set_end_of_stream()
log.info('EOF ogr2ogr output')
return packet