From 03b79393e71910a33a39864e563fcbeb2de56658 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 19 Apr 2020 22:31:05 -0700 Subject: [PATCH 1/5] Adding section for UDF serialization --- docs/broadcast-guide.md | 92 +++++++++++++++++++++ docs/udf-guide.md | 172 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 docs/broadcast-guide.md create mode 100644 docs/udf-guide.md diff --git a/docs/broadcast-guide.md b/docs/broadcast-guide.md new file mode 100644 index 000000000..4286c569e --- /dev/null +++ b/docs/broadcast-guide.md @@ -0,0 +1,92 @@ +# Guide to using Broadcast Variables + +This is a guide to show how to use broadcast variables in .NET for Apache Spark. + +## What are Broadcast Variables + +[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. + +### How to use broadcast variables in .NET for Apache Spark + +Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method on it. + +Example: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf = Udf( + str => $"{str}: {bv.Value()}"); +``` + +The type of broadcast variable is captured by using Generics in C#, as can be seen in the above example. + +### Deleting broadcast variables + +The broadcast variable can be deleted from all executors by calling the `Destroy()` function on it. + +```csharp +// Destroying the broadcast variable bv: +bv.Destroy(); +``` + +> Note: `Destroy` deletes all data and metadata related to the broadcast variable. Use this with caution- once a broadcast variable has been destroyed, it cannot be used again. + +#### Caveat of using Destroy + +One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable. + +Example to demonstrate: + +```csharp +string v = "Variable to be broadcasted"; +Broadcast bv = SparkContext.Broadcast(v); + +// Using the broadcast variable in a UDF: +Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + +// Destroying bv +bv.Destroy(); + +// Calling udf1 after destroying bv throws the following expected exception: +// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed +df.Select(udf1(df["_1"])).Show(); + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 throws the following (unexpected) exception: +// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable +df.Select(udf2(df["_1"])).Show(); +``` + +The recommended way of implementing above desired behavior: + +```csharp +string v = "Variable to be broadcasted"; +// Restricting the visibility of bv to only the UDF referencing it +{ + Broadcast bv = SparkContext.Broadcast(v); + + // Using the broadcast variable in a UDF: + Func udf1 = Udf( + str => $"{str}: {bv.Value()}"); + + // Destroying bv + bv.Destroy(); +} + +// Different UDF udf2 that is not referencing bv +Func udf2 = Udf( + str => $"{str}: not referencing broadcast variable"); + +// Calling udf2 works fine as expected +df.Select(udf2(df["_1"])).Show(); +``` + This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior. + + Broadcast variables are very useful for transmitting read-only data to all executors, as the data is sent only once and this gives huge performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used. \ No newline at end of file diff --git a/docs/udf-guide.md b/docs/udf-guide.md new file mode 100644 index 000000000..bb308815d --- /dev/null +++ b/docs/udf-guide.md @@ -0,0 +1,172 @@ +# Guide to User-Defined Functions (UDFs) + +This is a guide to show how to use UDFs in .NET for Apache Spark. + +## What are UDFs + +[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF. + +Let's take the following as an example for a UDF definition: + +```csharp +string s1 = "hello"; +Func udf = Udf( + str => $"{s1} {str}"); + +``` +The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input. + +For a sample Dataframe, let's take the following Dataframe `df`: + +```text ++-------+ +| name| ++-------+ +|Michael| +| Andy| +| Justin| ++-------+ +``` + +Now let's apply the above defined `udf` to the dataframe `df`: + +```csharp +DataFrame udfResult = df.Select(udf(df["name"])); +``` + +This would return the below as the Dataframe `udfResult`: + +```text ++-------------+ +| name| ++-------------+ +|hello Michael| +| hello Andy| +| hello Justin| ++-------------+ +``` +To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49). + +## UDF serialization + +Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done. + +## Good to know while implementing UDFs + +One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means. + +The following code snippet defines two string variables that are being referenced in two function delegates, that just return the respective strings as result: + +```csharp +using System; + +public class C { + public void M() { + string s1 = "s1"; + string s2 = "s2"; + Func a = str => s1; + Func b = str => s2; + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + public string s2; + + internal string b__0(string str) + { + return s1; + } + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + <>c__DisplayClass0_.s2 = "s2"; + Func func = new Func(<>c__DisplayClass0_.b__0); + Func func2 = new Func(<>c__DisplayClass0_.b__1); + } +} +``` +As can be seen in the above IL code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers. + +This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope. +Taking the above example to better explain what that means: + +Recommended user code to implement desired behavior of previous code snippet: + +```csharp +using System; + +public class C { + public void M() { + { + string s1 = "s1"; + Func a = str => s1; + } + { + string s2 = "s2"; + Func b = str => s2; + } + } +} +``` + +The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: + +```csharp +public class C +{ + [CompilerGenerated] + private sealed class <>c__DisplayClass0_0 + { + public string s1; + + internal string b__0(string str) + { + return s1; + } + } + + [CompilerGenerated] + private sealed class <>c__DisplayClass0_1 + { + public string s2; + + internal string b__1(string str) + { + return s2; + } + } + + public void M() + { + <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); + <>c__DisplayClass0_.s1 = "s1"; + Func func = new Func(<>c__DisplayClass0_.b__0); + <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1(); + <>c__DisplayClass0_2.s2 = "s2"; + Func func2 = new Func(<>c__DisplayClass0_2.b__1); + } +} +``` + +Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. + +This above behavior is important to keep in mind while implementing multiple UDFs in a common scope. +To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d). \ No newline at end of file From 4ef693dbf7616b738a6ae70d1e9dc8c12dd8e5d3 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 19 Apr 2020 22:32:56 -0700 Subject: [PATCH 2/5] removing guides from master --- docs/broadcast-guide.md | 92 --------------------- docs/udf-guide.md | 172 ---------------------------------------- 2 files changed, 264 deletions(-) delete mode 100644 docs/broadcast-guide.md delete mode 100644 docs/udf-guide.md diff --git a/docs/broadcast-guide.md b/docs/broadcast-guide.md deleted file mode 100644 index 4286c569e..000000000 --- a/docs/broadcast-guide.md +++ /dev/null @@ -1,92 +0,0 @@ -# Guide to using Broadcast Variables - -This is a guide to show how to use broadcast variables in .NET for Apache Spark. - -## What are Broadcast Variables - -[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. - -### How to use broadcast variables in .NET for Apache Spark - -Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method on it. - -Example: - -```csharp -string v = "Variable to be broadcasted"; -Broadcast bv = SparkContext.Broadcast(v); - -// Using the broadcast variable in a UDF: -Func udf = Udf( - str => $"{str}: {bv.Value()}"); -``` - -The type of broadcast variable is captured by using Generics in C#, as can be seen in the above example. - -### Deleting broadcast variables - -The broadcast variable can be deleted from all executors by calling the `Destroy()` function on it. - -```csharp -// Destroying the broadcast variable bv: -bv.Destroy(); -``` - -> Note: `Destroy` deletes all data and metadata related to the broadcast variable. Use this with caution- once a broadcast variable has been destroyed, it cannot be used again. - -#### Caveat of using Destroy - -One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable. - -Example to demonstrate: - -```csharp -string v = "Variable to be broadcasted"; -Broadcast bv = SparkContext.Broadcast(v); - -// Using the broadcast variable in a UDF: -Func udf1 = Udf( - str => $"{str}: {bv.Value()}"); - -// Destroying bv -bv.Destroy(); - -// Calling udf1 after destroying bv throws the following expected exception: -// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed -df.Select(udf1(df["_1"])).Show(); - -// Different UDF udf2 that is not referencing bv -Func udf2 = Udf( - str => $"{str}: not referencing broadcast variable"); - -// Calling udf2 throws the following (unexpected) exception: -// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable -df.Select(udf2(df["_1"])).Show(); -``` - -The recommended way of implementing above desired behavior: - -```csharp -string v = "Variable to be broadcasted"; -// Restricting the visibility of bv to only the UDF referencing it -{ - Broadcast bv = SparkContext.Broadcast(v); - - // Using the broadcast variable in a UDF: - Func udf1 = Udf( - str => $"{str}: {bv.Value()}"); - - // Destroying bv - bv.Destroy(); -} - -// Different UDF udf2 that is not referencing bv -Func udf2 = Udf( - str => $"{str}: not referencing broadcast variable"); - -// Calling udf2 works fine as expected -df.Select(udf2(df["_1"])).Show(); -``` - This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior. - - Broadcast variables are very useful for transmitting read-only data to all executors, as the data is sent only once and this gives huge performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used. \ No newline at end of file diff --git a/docs/udf-guide.md b/docs/udf-guide.md deleted file mode 100644 index bb308815d..000000000 --- a/docs/udf-guide.md +++ /dev/null @@ -1,172 +0,0 @@ -# Guide to User-Defined Functions (UDFs) - -This is a guide to show how to use UDFs in .NET for Apache Spark. - -## What are UDFs - -[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF. - -Let's take the following as an example for a UDF definition: - -```csharp -string s1 = "hello"; -Func udf = Udf( - str => $"{s1} {str}"); - -``` -The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input. - -For a sample Dataframe, let's take the following Dataframe `df`: - -```text -+-------+ -| name| -+-------+ -|Michael| -| Andy| -| Justin| -+-------+ -``` - -Now let's apply the above defined `udf` to the dataframe `df`: - -```csharp -DataFrame udfResult = df.Select(udf(df["name"])); -``` - -This would return the below as the Dataframe `udfResult`: - -```text -+-------------+ -| name| -+-------------+ -|hello Michael| -| hello Andy| -| hello Justin| -+-------------+ -``` -To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49). - -## UDF serialization - -Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done. - -## Good to know while implementing UDFs - -One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means. - -The following code snippet defines two string variables that are being referenced in two function delegates, that just return the respective strings as result: - -```csharp -using System; - -public class C { - public void M() { - string s1 = "s1"; - string s2 = "s2"; - Func a = str => s1; - Func b = str => s2; - } -} -``` - -The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: - -```csharp -public class C -{ - [CompilerGenerated] - private sealed class <>c__DisplayClass0_0 - { - public string s1; - - public string s2; - - internal string b__0(string str) - { - return s1; - } - - internal string b__1(string str) - { - return s2; - } - } - - public void M() - { - <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); - <>c__DisplayClass0_.s1 = "s1"; - <>c__DisplayClass0_.s2 = "s2"; - Func func = new Func(<>c__DisplayClass0_.b__0); - Func func2 = new Func(<>c__DisplayClass0_.b__1); - } -} -``` -As can be seen in the above IL code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers. - -This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope. -Taking the above example to better explain what that means: - -Recommended user code to implement desired behavior of previous code snippet: - -```csharp -using System; - -public class C { - public void M() { - { - string s1 = "s1"; - Func a = str => s1; - } - { - string s2 = "s2"; - Func b = str => s2; - } - } -} -``` - -The above C# code generates the following C# disassembly (credit source: [sharplab.io](sharplab.io)) code from the compiler: - -```csharp -public class C -{ - [CompilerGenerated] - private sealed class <>c__DisplayClass0_0 - { - public string s1; - - internal string b__0(string str) - { - return s1; - } - } - - [CompilerGenerated] - private sealed class <>c__DisplayClass0_1 - { - public string s2; - - internal string b__1(string str) - { - return s2; - } - } - - public void M() - { - <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0(); - <>c__DisplayClass0_.s1 = "s1"; - Func func = new Func(<>c__DisplayClass0_.b__0); - <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1(); - <>c__DisplayClass0_2.s2 = "s2"; - Func func2 = new Func(<>c__DisplayClass0_2.b__1); - } -} -``` - -Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. - -This above behavior is important to keep in mind while implementing multiple UDFs in a common scope. -To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d). \ No newline at end of file From f564495fe0a770d8314f0ea54802eb1a28bb40e7 Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Sun, 21 Jun 2020 23:56:44 -0700 Subject: [PATCH 3/5] Marking classes as serializable to fix the Broadcast serialization issue --- src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs | 1 + src/csharp/Microsoft.Spark/SparkConf.cs | 1 + src/csharp/Microsoft.Spark/SparkContext.cs | 2 ++ src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs | 2 ++ src/csharp/Microsoft.Spark/Sql/SparkSession.cs | 1 + 5 files changed, 7 insertions(+) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs index be19cce3d..4e5b1c0af 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs @@ -112,6 +112,7 @@ internal interface IJvmObjectReferenceProvider /// /// Reference to object created in JVM. /// + [Serializable] internal sealed class JvmObjectReference : IJvmObjectReferenceProvider { /// diff --git a/src/csharp/Microsoft.Spark/SparkConf.cs b/src/csharp/Microsoft.Spark/SparkConf.cs index 6608341b8..cb1eaaba5 100644 --- a/src/csharp/Microsoft.Spark/SparkConf.cs +++ b/src/csharp/Microsoft.Spark/SparkConf.cs @@ -17,6 +17,7 @@ namespace Microsoft.Spark /// Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be /// modified by the user. Spark does not support modifying the configuration at runtime. /// + [Serializable] public sealed class SparkConf : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; diff --git a/src/csharp/Microsoft.Spark/SparkContext.cs b/src/csharp/Microsoft.Spark/SparkContext.cs index f911517ef..8d8c713f8 100644 --- a/src/csharp/Microsoft.Spark/SparkContext.cs +++ b/src/csharp/Microsoft.Spark/SparkContext.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.Collections.Generic; using System.IO; using System.Runtime.Serialization.Formatters.Binary; @@ -18,6 +19,7 @@ namespace Microsoft.Spark /// Only one `SparkContext` should be active per JVM. You must `stop()` the /// active `SparkContext` before creating a new one. /// + [Serializable] public sealed class SparkContext : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; diff --git a/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs b/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs index 2175ddb07..c82fb4aab 100644 --- a/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs +++ b/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using Microsoft.Spark.Interop.Ipc; namespace Microsoft.Spark.Sql.Catalog @@ -9,6 +10,7 @@ namespace Microsoft.Spark.Sql.Catalog /// /// Catalog interface for Spark. To access this, use SparkSession.Catalog. /// + [Serializable] public sealed class Catalog : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; diff --git a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs index fc706081f..22a06082c 100644 --- a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs +++ b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs @@ -17,6 +17,7 @@ namespace Microsoft.Spark.Sql /// /// The entry point to programming Spark with the Dataset and DataFrame API. /// + [Serializable] public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; From b7619982962d7d059d790184d293595b37b2a5cd Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Mon, 22 Jun 2020 21:09:14 -0700 Subject: [PATCH 4/5] Marking necessary classes as serializable to make broadcast work in notebook scenario --- src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 4 ++++ src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs | 2 ++ src/csharp/Microsoft.Spark/SparkConf.cs | 1 - src/csharp/Microsoft.Spark/SparkContext.cs | 2 +- src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs | 1 - src/csharp/Microsoft.Spark/Sql/SparkSession.cs | 2 ++ 6 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index abfa63b19..0dade2e25 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -18,6 +18,7 @@ namespace Microsoft.Spark.Interop.Ipc /// Using a concurrent socket connection queue (lightweight synchronization mechanism) /// supporting async JVM calls like StreamingContext.AwaitTermination() /// + [Serializable] internal sealed class JvmBridge : IJvmBridge { // TODO: On .NET Core 2.1, Span could be used with a stack-based @@ -28,10 +29,13 @@ internal sealed class JvmBridge : IJvmBridge [ThreadStatic] private static object[] s_twoArgArray; [ThreadStatic] + [NonSerialized] private static MemoryStream s_payloadMemoryStream; + [NonSerialized] private readonly ConcurrentQueue _sockets = new ConcurrentQueue(); + [NonSerialized] private readonly ILoggerService _logger = LoggerServiceFactory.GetLogger(typeof(JvmBridge)); private readonly int _portNumber; diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs index 4e5b1c0af..b4bf857ea 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs @@ -14,6 +14,7 @@ namespace Microsoft.Spark.Interop.Ipc /// The reason for having another layer on top of string id is /// so that JvmObjectReference can be copied. /// + [Serializable] internal sealed class JvmObjectId { private static readonly ILoggerService s_logger = @@ -150,6 +151,7 @@ internal JvmObjectReference(JvmObjectReference other) /// /// An unique identifier for an object created on the JVM. /// + /// internal JvmObjectId Id { get; } /// diff --git a/src/csharp/Microsoft.Spark/SparkConf.cs b/src/csharp/Microsoft.Spark/SparkConf.cs index cb1eaaba5..6608341b8 100644 --- a/src/csharp/Microsoft.Spark/SparkConf.cs +++ b/src/csharp/Microsoft.Spark/SparkConf.cs @@ -17,7 +17,6 @@ namespace Microsoft.Spark /// Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be /// modified by the user. Spark does not support modifying the configuration at runtime. /// - [Serializable] public sealed class SparkConf : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; diff --git a/src/csharp/Microsoft.Spark/SparkContext.cs b/src/csharp/Microsoft.Spark/SparkContext.cs index 8d8c713f8..6f8357290 100644 --- a/src/csharp/Microsoft.Spark/SparkContext.cs +++ b/src/csharp/Microsoft.Spark/SparkContext.cs @@ -19,11 +19,11 @@ namespace Microsoft.Spark /// Only one `SparkContext` should be active per JVM. You must `stop()` the /// active `SparkContext` before creating a new one. /// - [Serializable] public sealed class SparkContext : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; + [NonSerialized] private readonly SparkConf _conf; /// diff --git a/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs b/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs index c82fb4aab..6aa17313c 100644 --- a/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs +++ b/src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs @@ -10,7 +10,6 @@ namespace Microsoft.Spark.Sql.Catalog /// /// Catalog interface for Spark. To access this, use SparkSession.Catalog. /// - [Serializable] public sealed class Catalog : IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; diff --git a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs index 22a06082c..4f7b58cdc 100644 --- a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs +++ b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs @@ -22,7 +22,9 @@ public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider { private readonly JvmObjectReference _jvmObject; + [NonSerialized] private readonly Lazy _sparkContext; + [NonSerialized] private readonly Lazy _catalog; private static readonly string s_sparkSessionClassName = From 6b9825e5f93161e36afb0c6f8921efbc124ecd0a Mon Sep 17 00:00:00 2001 From: Niharika Dutta Date: Tue, 23 Jun 2020 02:29:20 -0700 Subject: [PATCH 5/5] Marking only SparkSession as serializable --- src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 4 ---- src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs | 2 -- src/csharp/Microsoft.Spark/Sql/SparkSession.cs | 1 + 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index 0dade2e25..abfa63b19 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -18,7 +18,6 @@ namespace Microsoft.Spark.Interop.Ipc /// Using a concurrent socket connection queue (lightweight synchronization mechanism) /// supporting async JVM calls like StreamingContext.AwaitTermination() /// - [Serializable] internal sealed class JvmBridge : IJvmBridge { // TODO: On .NET Core 2.1, Span could be used with a stack-based @@ -29,13 +28,10 @@ internal sealed class JvmBridge : IJvmBridge [ThreadStatic] private static object[] s_twoArgArray; [ThreadStatic] - [NonSerialized] private static MemoryStream s_payloadMemoryStream; - [NonSerialized] private readonly ConcurrentQueue _sockets = new ConcurrentQueue(); - [NonSerialized] private readonly ILoggerService _logger = LoggerServiceFactory.GetLogger(typeof(JvmBridge)); private readonly int _portNumber; diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs index b4bf857ea..3527b2fb2 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmObjectReference.cs @@ -14,7 +14,6 @@ namespace Microsoft.Spark.Interop.Ipc /// The reason for having another layer on top of string id is /// so that JvmObjectReference can be copied. /// - [Serializable] internal sealed class JvmObjectId { private static readonly ILoggerService s_logger = @@ -113,7 +112,6 @@ internal interface IJvmObjectReferenceProvider /// /// Reference to object created in JVM. /// - [Serializable] internal sealed class JvmObjectReference : IJvmObjectReferenceProvider { /// diff --git a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs index 4f7b58cdc..0c96e7a08 100644 --- a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs +++ b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs @@ -20,6 +20,7 @@ namespace Microsoft.Spark.Sql [Serializable] public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider { + [NonSerialized] private readonly JvmObjectReference _jvmObject; [NonSerialized]