支持的数据类型
Spark 连接器支持 Amazon Redshift 中的以下数据类型。有关 Amazon Redshift 中支持的数据类型的完整列表,请参阅数据类型。如果某个数据类型不在下表中,则 Spark 连接器不支持该数据类型。
数据类型 | 别名 |
---|---|
SMALLINT | INT2 |
INTEGER | INT、INT4 |
BIGINT | INT8 |
DECIMAL | NUMERIC |
REAL | FLOAT4 |
DOUBLE PRECISION | FLOAT8、FLOAT |
BOOLEAN | BOOL |
CHAR | CHARACTER、NCHAR、BPCHAR |
VARCHAR | CHARACTER VARYING、NVARCHAR、TEXT |
DATE | |
TIMESTAMP | Timestamp without time zone |
TIMESTAMPTZ | Timestamp with time zone |
SUPER | |
TIME | TIME WITHOUT TIME ZONE |
TIMETZ | Time with time zone |
VARBYTE | VARBINARY,BINARY VARYING |
复杂的数据类型
您可以使用 spark 连接器在 Redshift SUPER 数据类型列中读写 Spark 复杂数据类型,如 ArrayType
、MapType
和 StructType
。如果您在读取操作期间提供架构,则该列中的数据将在 Spark 中转换为相应的复杂类型,包括任何嵌套类型。此外,如果启用 autopushdown
,嵌套属性、映射值和数组索引的投影将下推到 Redshift,这样,当只访问一部分数据时,就不再需要卸载整个嵌套数据结构。
从连接器写入 DataFrame 时,任何类型为 MapType
(使用 StringType
)、StructType
或 ArrayType
的列都会写入 Redshift SUPER 数据类型列。在写入这些嵌套数据结构时,tempformat
参数必须为类型 CSV
、CSV GZIP
或 PARQUET
。使用 AVRO
将导致异常。写入一个键类型不是 StringType
的 MapType
数据结构也会导致异常。
StructType
以下示例演示如何使用包含结构的 SUPER 数据类型创建表
create table contains_super (a super);
然后,您可以使用连接器,使用下面示例中的类似架构,从表中的 SUPER 列 a
查询 StringType
字段 hello
。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a.hello")
以下示例演示如何向列 a
写入结构。
import org.apache.spark.sql.types._ import org.apache.spark.sql._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val data = sc.parallelize(Seq(Row(Row("world")))) val mydf = sqlContext.createDataFrame(data, schema) mydf.write.format("io.github.spark_redshift_community.spark.redshift"). option("url", jdbcUrl). option("dbtable", tableName). option("tempdir", tempS3Dir). option("tempformat", "CSV"). mode(SaveMode.Append).save
MapType
如果您更喜欢使用 MapType
来表示数据,那么您可以在架构中使用 MapType
数据结构,并检索映射中与键对应的值。请注意,MapType
数据结构中的所有键都必须是 String 类型,并且所有值都必须是相同的类型,例如 int。
以下示例演示如何获取列 a
中键 hello
的值。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", MapType(StringType, IntegerType))::Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a['hello']")
ArrayType
如果该列包含数组而不是结构,则可以使用连接器查询数组中的第一个元素。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", ArrayType(IntegerType)):: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a[0]")
限制
通过 spark 连接器使用复杂数据类型有以下限制:
-
所有嵌套的结构字段名称和映射键必须为小写。如果查询带有大写字母的复杂字段名称,可以尝试省略架构,并使用
from_json
spark 函数在本地转换返回的字符串来作为解决方法。 -
在读取或写入操作中使用的任何映射字段都必须只有
StringType
键。 -
只有
CSV
、CSV GZIP
和PARQUET
是支持将复杂类型写入 Redshift 的临时格式值。尝试使用AVRO
会引发异常。