Returns the schema of this DataFrame.
Persist this DataFrame with the default storage level (MEMORY_AND_DISK).
Persist this DataFrame with the given storage level.
a storage level.
Persist this DataFrame with the default storage level (MEMORY_AND_DISK).
Mark the DataFrame as non-persistent, and remove all blocks for it from memory and disk. This will not un-persist any cached data that is built upon this Dataset.
Whether to block until all blocks are deleted.
Get the DataFrame's current storage level, or StorageLevel.NONE if not persisted.
Create a write builder for writing to a table using V2 API
Displays the Dataset in a tabular form. For example:
Displays the Dataset in a tabular form. For example:
Number of rows to show
Displays the Dataset in a tabular form. For example:
Number of rows to show
If set to true, truncate the displayed columns to 20 characters, default is true
Displays the Dataset in a tabular form. For example:
Number of rows to show
If set to true, truncate the displayed columns to 20 characters, default is true
If set to true, print output rows vertically (one line per column value)
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.
{{{ // The following are equivalent: df.selectExpr("colA", "colB as newName", "abs(colC)") df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)")) // TODO: support expr(..) function }}}
Selects column based on the column name and returns it as a [[org.apache.spark.sql.Column]].
string column name
Column
Selects column based on the column name specified as a regex and returns it as [[org.apache.spark.sql.Column]].
string column name specified as a regex
Column
Selects a metadata column based on its logical column name, and returns it as a [[org.apache.spark.sql.Column]].
A metadata column can be accessed this way even if the underlying data source defines a data column with a conflicting name.
string column name
Column
Returns all rows in this DataFrame as an array of Row objects.
A promise that resolves to an array of Row objects
Returns the first n rows.
The number of rows to return
A promise that resolves to an array of Row objects
Returns the first row. Alias for head().
A promise that resolves to the first Row
Returns the first n rows. Alias for head(n).
The number of rows to return
A promise that resolves to an array of Row objects
Returns the last n rows in the DataFrame.
The number of rows to return from the end
A promise that resolves to an array of Row objects
Returns the number of rows in the Dataset.
Returns a checkpointed version of this DataFrame. Checkpointing can be used to truncate the
logical plan of this DataFrame, which is especially useful in iterative algorithms where the
plan may grow exponentially. It will be saved to files inside the checkpoint
directory set with spark.sql.checkpoint.location.
Whether to checkpoint this DataFrame immediately (default is true). If false, the checkpoint will be performed when the DataFrame is first materialized.
Returns a locally checkpointed version of this DataFrame. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to a local temporary directory.
This is a local checkpoint and is less reliable than a regular checkpoint because it is stored in executor storage and may be lost if executors fail.
Whether to checkpoint this DataFrame immediately (default is true). If false, the checkpoint will be performed when the DataFrame is first materialized.
OptionalstorageLevel: StorageLevelThe storage level to use for the local checkpoint. If not specified, the default storage level is used.
Specifies some hint on the current DataFrame. As an example, the following code specifies that one of the plan can be broadcasted:
{{{ df1.join(df2.hint("broadcast")) }}}
the following code specifies that this dataset could be rebalanced with given number of partitions:
{{{ df1.hint("rebalance", 10) }}}
the name of the hint
the parameters of the hint, all the parameters should be a Column or Expression or
could be converted into a Literal
Returns a new DataFrame with columns renamed.
New column names. If empty, returns this DataFrame unchanged.
A new DataFrame with the specified column names
Returns a new DataFrame with the specified schema applied.
The schema to apply to this DataFrame
A new DataFrame with the specified schema
Returns a new DataFrame by taking the first n rows.
The number of rows to take
A new DataFrame with at most n rows
Returns a new DataFrame by skipping the first n rows.
The number of rows to skip
A new DataFrame with the first n rows removed
Filters rows using the given SQL expression string.
A SQL expression string representing the filter condition
A new DataFrame with rows matching the condition
Filters rows using the given SQL expression string. Alias for filter().
A SQL expression string representing the filter condition
A new DataFrame with rows matching the condition
Returns true if this DataFrame contains one or more sources that continuously return data as it arrives.
A promise that resolves to true if this is a streaming DataFrame
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
This is equivalent to UNION DISTINCT in SQL.
To do a SQL-style union that keeps duplicates, use [[unionAll]].
Also as standard in SQL, this function resolves columns by position (not by name):
{{{ val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2") val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0") df1.union(df2).show
// output: // +----+----+----+ // |col0|col1|col2| // +----+----+----+ // | 1| 2| 3| // | 4| 5| 6| // +----+----+----+ }}}
Notice that the column positions in the schema aren't necessarily matched with the fields in the strongly typed objects in a Dataset. This function resolves columns by their positions in the schema, not the fields in the strongly typed objects. Use [[unionByName]] to resolve columns by field name in the typed objects.
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
This is equivalent to UNION ALL in SQL.
To do a SQL-style set union (that does deduplication of elements), use [[union]].
Also as standard in SQL, this function resolves columns by position (not by name).
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
Unlike [[union]], this function resolves columns by name (not by position).
This is equivalent to UNION ALL in SQL with column name matching.
When the parameter allowMissingColumns is true, the set of column names
in this and other Dataset can differ; missing columns will be filled with null.
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
Unlike [[union]], this function resolves columns by name (not by position).
This is equivalent to UNION ALL in SQL with column name matching.
When the parameter allowMissingColumns is true, the set of column names
in this and other Dataset can differ; missing columns will be filled with null.
Returns a new DataFrame that has exactly numPartitions partitions.
This operation requires a shuffle, making it a wide transformation.
The target number of partitions. Must be positive.
Returns a new DataFrame partitioned by the given partitioning expressions,
using numPartitions partitions. The resulting DataFrame is hash partitioned.
This operation requires a shuffle, making it a wide transformation.
The target number of partitions
Column expressions to partition by
Returns a new DataFrame that has exactly numPartitions partitions, when
the fewer partitions are requested. If a larger number of partitions is requested,
it will stay at the current number of partitions. Similar to coalesce defined on an RDD,
this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10
of the current partitions. If a larger number of partitions is requested, it will stay at the
current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition(). This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
The target number of partitions. Must be positive.
Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is range partitioned.
At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each partition of the resulting DataFrame.
This operation requires a shuffle, making it a wide transformation.
Column expressions to partition by
Returns a new DataFrame partitioned by the given partitioning expressions into
numPartitions. The resulting DataFrame is range partitioned.
At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each partition of the resulting DataFrame.
This operation requires a shuffle, making it a wide transformation.
The target number of partitions
Column expressions to partition by
Apply a function to each partition of the DataFrame.
This method applies a user-defined function to each partition of the DataFrame. The function should take an iterator of rows and return an iterator of rows.
Python code as a string defining the partition processing function
The output schema for the transformed DataFrame
Python version (default: '3.11')
A new DataFrame with the function applied to each partition
const pythonCode = `
def process_partition(partition):
for row in partition:
yield (row.id * 2, row.value)
`;
const schema = DataTypes.createStructType([
DataTypes.createStructField('id', DataTypes.IntegerType, false),
DataTypes.createStructField('value', DataTypes.StringType, false),
]);
const result = df.mapPartitions(pythonCode, schema);
Co-group two DataFrames and apply a function to each group.
This method groups two DataFrames by the specified columns and applies a user-defined function to each group pair. The function receives the group key and iterators for rows from both DataFrames.
The other DataFrame to co-group with
Columns to group by for this DataFrame
Columns to group by for the other DataFrame
Python code as a string defining the co-group processing function
The output schema for the transformed DataFrame
Python version (default: '3.11')
A new DataFrame with the function applied to each co-group
const pythonCode = `
def cogroup_func(key, left_rows, right_rows):
for l in left_rows:
for r in right_rows:
yield (key.id, l.value, r.value)
`;
const schema = DataTypes.createStructType([
DataTypes.createStructField('id', DataTypes.IntegerType, false),
DataTypes.createStructField('left_value', DataTypes.StringType, false),
DataTypes.createStructField('right_value', DataTypes.StringType, false),
]);
const result = df1.coGroupMap(df2, [col('id')], [col('id')], pythonCode, schema);
Groups the Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns grouped by department. ds.groupBy($"department").avg()
// Compute the max age and average salary, grouped by department and gender. ds.groupBy($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" )) }}}
Groups the Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns grouped by department. ds.groupBy($"department").avg()
// Compute the max age and average salary, grouped by department and gender. ds.groupBy($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" )) }}}
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns rolled up by department and group. ds.rollup(col("department"), col("group")).avg() }}}
Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns rolled up by department and group. ds.rollup(col("department"), col("group")).avg() }}}
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns cubed by department and group. ds.cube($"department", $"group").avg()
// Compute the max age and average salary, cubed by department and gender. ds.cube($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" )) }}}
Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate functions.
{{{ // Compute the average for all numeric columns cubed by department and group. ds.cube($"department", $"group").avg()
// Compute the max age and average salary, cubed by department and gender. ds.cube($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" )) }}}
Join with another DataFrame, using the given join expression. The following performs a full
outer join between df1 and df2.
Right side of the join.
Join expression.
Type of join to perform. Default inner. Must be one of:
inner, cross, outer, full, fullouter, full_outer, left, leftouter,
left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi,
anti, leftanti, left_anti.
Join with another DataFrame using the list of columns to join on.
Right side of the join.
Names of columns to join on. These columns must exist on both sides.
Type of join to perform. Default inner. Must be one of:
inner, cross, outer, full, fullouter, full_outer, left, leftouter,
left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi,
anti, leftanti, left_anti.
Perform an as-of join between this DataFrame and another DataFrame.
This is similar to a left-join except that we match on nearest key rather than equal keys. For each row in the left DataFrame, we find the closest match in the right DataFrame based on the as-of column(s) and join condition.
Right side of the join.
Column to join on from the left DataFrame.
Column to join on from the right DataFrame.
OptionaljoinExprs: ColumnOptional additional join expression.
OptionaljoinType: stringType of join to perform. Default inner.
Optionaltolerance: ColumnOptional tolerance for inexact matches.
OptionalallowExactMatches: booleanWhether to allow exact matches. Default true.
Optionaldirection: stringDirection of search. One of: backward, forward, nearest. Default backward.
Perform an as-of join between this DataFrame and another DataFrame using column names.
Right side of the join.
Column name to join on from the left DataFrame.
Column name to join on from the right DataFrame.
OptionalusingColumns: string[]Names of columns to join on. These columns must exist on both sides.
OptionaljoinType: stringType of join to perform. Default inner.
Optionaltolerance: ColumnOptional tolerance for inexact matches.
OptionalallowExactMatches: booleanWhether to allow exact matches. Default true.
Optionaldirection: stringDirection of search. One of: backward, forward, nearest. Default backward.
Perform a lateral join between this DataFrame and another DataFrame.
Lateral joins allow the right side to reference columns from the left side. This is useful for operations like exploding arrays or applying table-valued functions.
Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
set. This is the reverse to groupBy(...).pivot(...).agg(...), except for the aggregation,
which cannot be reversed.
This function is useful to massage a DataFrame into a format where some columns are
identifier columns ("ids"), while all other columns ("values") are "unpivoted" to the rows,
leaving just two non-id columns, named as given by variableColumnName and
valueColumnName.
When no "id" columns are given, the unpivoted DataFrame consists of only the "variable" and "value" columns.
All "value" columns must share a least common data type. Unless they are the same data type,
all "value" columns are cast to the nearest common data type. For instance, types
IntegerType and LongType are cast to LongType, while IntegerType and StringType do
not have a common data type and unpivot fails with an AnalysisException.
Id columns
Name of the variable column
Name of the value column
Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
set. This is the reverse to groupBy(...).pivot(...).agg(...), except for the aggregation,
which cannot be reversed.
This function is useful to massage a DataFrame into a format where some columns are
identifier columns ("ids"), while all other columns ("values") are "unpivoted" to the rows,
leaving just two non-id columns, named as given by variableColumnName and
valueColumnName.
When no "id" columns are given, the unpivoted DataFrame consists of only the "variable" and "value" columns.
All "value" columns must share a least common data type. Unless they are the same data type,
all "value" columns are cast to the nearest common data type. For instance, types
IntegerType and LongType are cast to LongType, while IntegerType and StringType do
not have a common data type and unpivot fails with an AnalysisException.
Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
set. This is the reverse to groupBy(...).pivot(...).agg(...), except for the aggregation,
which cannot be reversed. This is an alias for unpivot.
Id columns
Name of the variable column
Name of the value column
Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
set. This is the reverse to groupBy(...).pivot(...).agg(...), except for the aggregation,
which cannot be reversed. This is an alias for unpivot.
Transposes a DataFrame such that the values in the specified index column become the new columns of the DataFrame.
Please note:
{{{ val df = Seq(("A", 1, 2), ("B", 3, 4)).toDF("id", "val1", "val2") df.show() // output: // +---+----+----+ // | id|val1|val2| // +---+----+----+ // | A| 1| 2| // | B| 3| 4| // +---+----+----+
df.transpose($"id").show() // output: // +----+---+---+ // | key| A| B| // +----+---+---+ // |val1| 1| 3| // |val2| 2| 4| // +----+---+---+ // schema: // root // |-- key: string (nullable = false) // |-- A: integer (nullable = true) // |-- B: integer (nullable = true)
df.transpose().show() // output: // +----+---+---+ // | key| A| B| // +----+---+---+ // |val1| 1| 3| // |val2| 2| 4| // +----+---+---+ // schema: // root // |-- key: string (nullable = false) // |-- A: integer (nullable = true) // |-- B: integer (nullable = true) }}}
Transposes a DataFrame such that the values in the specified index column become the new columns of the DataFrame.
Please note:
{{{ val df = Seq(("A", 1, 2), ("B", 3, 4)).toDF("id", "val1", "val2") df.show() // output: // +---+----+----+ // | id|val1|val2| // +---+----+----+ // | A| 1| 2| // | B| 3| 4| // +---+----+----+
df.transpose($"id").show() // output: // +----+---+---+ // | key| A| B| // +----+---+---+ // |val1| 1| 3| // |val2| 2| 4| // +----+---+---+ // schema: // root // |-- key: string (nullable = false) // |-- A: integer (nullable = true) // |-- B: integer (nullable = true)
df.transpose().show() // output: // +----+---+---+ // | key| A| B| // +----+---+---+ // |val1| 1| 3| // |val2| 2| 4| // +----+---+---+ // schema: // root // |-- key: string (nullable = false) // |-- A: integer (nullable = true) // |-- B: integer (nullable = true) }}}
The single column that will be treated as the index for the transpose operation. This column will be used to pivot the data, transforming the DataFrame such that the values of the indexColumn become the new columns in the transposed DataFrame.
A distributed collection of data organized into named columns.
Remarks
DataFrame is the primary abstraction in Spark SQL for working with structured data. It provides a domain-specific language for distributed data manipulation and supports a wide variety of operations including selecting, filtering, joining, and aggregating.
DataFrames are lazy - operations are not executed until an action (like collect, count, or show) is called. This allows Spark to optimize the execution plan.
Example
Since
1.0.0