サポートされているデータ型
Amazon Redshift の以下のデータ型は Spark コネクタでサポートされています。Amazon Redshift でサポートされているデータ型の完全なリストについては、「データ型」を参照してください。データ型が下の表にない場合、そのデータ型は Spark コネクタではサポートされていません。
データ型 | エイリアス |
---|---|
SMALLINT | INT2 |
INTEGER | INT、INT4 |
BIGINT | INT8 |
DECIMAL | NUMERIC |
REAL | FLOAT4 |
DOUBLE PRECISION | FLOAT8、FLOAT |
BOOLEAN | BOOL |
CHAR | CHARACTER、NCHAR、BPCHAR |
VARCHAR | CHARACTER VARYING、NVARCHAR、TEXT |
DATE | |
TIMESTAMP | Timestamp without time zone |
TIMESTAMPTZ | Timestamp with time zone |
SUPER | |
TIME | Time without time zone |
TIMETZ | Time with time zone |
VARBYTE | VARBINARY、BINARY VARYING |
複雑なデータ型
Spark コネクタを使用して、Redshift の SUPER データ型列との間で、ArrayType
、MapType
、StructType
のような Spark の複雑なデータ型を読み書きできます。読み取りオペレーションにスキーマを指定すると、列のデータは Spark 内の対応する複合型 (ネストされた型も含む) に変換されます。さらに、autopushdown
を有効にすると、ネストされた属性、マップ値、配列インデックスの投影が Redshift にプッシュダウンされるため、データの一部だけにアクセスするときに、ネストされたデータ構造全体をアンロードする必要がなくなります。
コネクタから DataFrames を書き込む場合、MapType
型の列 (StringType
を使用)、StructType
、または ArrayType
は、Redshift SUPER データ型列に書き込まれます。これらのネストされたデータ構造を記述する場合、tempformat
パラメータは CSV
、CSV GZIP
、または PARQUET
である必要があります。AVRO
を使用すると、例外が発生します。また、StringType
以外のキータイプを持つ MapType
データ構造を記述すると、例外が発生します。
StructType
次の例は、構造体を含む SUPER データ型のテーブルを作成する方法を示しています。
create table contains_super (a super);
その後、次の例のようなスキーマを使用して、テーブルの SUPER 列 a
から StringType
フィールド hello
をクエリできます。
import org.apache.spark.sql.types._
val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)
val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil)
val helloDF = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcURL )
.option("tempdir", tempS3Dir)
.option("dbtable", "contains_super")
.schema(schema)
.load().selectExpr("a.hello")
次の例は、構造体を a
列に書き込む方法を示しています。
import org.apache.spark.sql.types._
import org.apache.spark.sql._
val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)
val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil)
val data = sc.parallelize(Seq(Row(Row("world"))))
val mydf = sqlContext.createDataFrame(data, schema)
mydf.write.format("io.github.spark_redshift_community.spark.redshift").
option("url", jdbcUrl).
option("dbtable", tableName).
option("tempdir", tempS3Dir).
option("tempformat", "CSV").
mode(SaveMode.Append).save
MapType
MapType
を使用してデータを表す場合は、スキーマ内の MapType
データ構造を使用して、マップ内のキーに対応する値を取得します。MapType
データ構造のすべてのキーは String 型でなければならず、すべての値は int のように同じ型でなければなりません。
次の例は、列 a
のキー hello
の値を取得する方法を示しています。
import org.apache.spark.sql.types._
val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)
val schema = StructType(StructField("a", MapType(StringType, IntegerType))::Nil)
val helloDF = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcURL )
.option("tempdir", tempS3Dir)
.option("dbtable", "contains_super")
.schema(schema)
.load().selectExpr("a['hello']")
ArrayType
列に構造体ではなく配列が含まれている場合は、コネクタを使用して配列の最初の要素をクエリできます。
import org.apache.spark.sql.types._
val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)
val schema = StructType(StructField("a", ArrayType(IntegerType)):: Nil)
val helloDF = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcURL )
.option("tempdir", tempS3Dir)
.option("dbtable", "contains_super")
.schema(schema)
.load().selectExpr("a[0]")
制約事項
Spark コネクタで複雑なデータ型を使用する場合は、次の制限があります。
-
ネストされた構造体フィールド名とマップキーはすべて小文字でなければなりません。大文字を含む複雑なフィールド名をクエリする場合は、回避策として、スキーマを省略し、
from_json
spark 関数を使用して、返された文字列をローカルに変換するよう試みることができます。 -
読み取りまたは書き込みオペレーションで使用されるマップフィールドには、
StringType
キーのみが必要です。 -
CSV
、CSV GZIP
、およびPARQUET
のみが、複合型を Redshift に書き込むためにサポートされる tempformat 値です。AVRO
を使用しようとすると、例外が発生します。