python中执行spark算子总是报错,怎么解决?

作者站长头像
站长
· 阅读数 27

python中执行spark算子总是报错,新手上路,请教各路大神,怎么解决?

24/06/17 16:31:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.net.SocketException: Connection reset

at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:328)
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:757)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:259)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:265)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:842)

24/06/17 16:31:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (DESKTOP-L1K769I executor driver): java.net.SocketException: Connection reset

at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:328)
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:757)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:259)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:265)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:842).....
回复
1个回答
avatar
test
2024-06-18

java.net.SocketException: Connection reset 错误,由于网络连接问题或者 Spark 环境配置问题导致的。1. 检查网络配置,关闭防火墙2. 增加 Spark 执行器的内存和核心数量

from pyspark import SparkConf, SparkContext

conf = SparkConf() \
    .setAppName("YourAppName") \
    .setMaster("local[*]") \
    .set("spark.executor.memory", "4g") \
    .set("spark.executor.cores", "2") \
    .set("spark.driver.memory", "4g")

sc = SparkContext(conf=conf)

3. 调整 Spark 的网络相关参数

conf.set("spark.network.timeout", "600s")
conf.set("spark.executor.heartbeatInterval", "100s")

4. 增加数据处理的并行度

rdd = sc.textFile("hdfs://path/to/your/file").repartition(100)

5. 确保所有集群节点上的 Python 环境一致,并且 Python 版本与 Spark 兼容。6.确保 PySpark 和 Spark 的版本匹配。在使用 PySpark 时,建议保持 Spark 集群和 PySpark 库的版本一致。7.示例配置 SparkContext

from pyspark import SparkConf, SparkContext

conf = SparkConf() \
    .setAppName("YourAppName") \
    .setMaster("local[*]") \
    .set("spark.executor.memory", "4g") \
    .set("spark.executor.cores", "2") \
    .set("spark.driver.memory", "4g") \
    .set("spark.network.timeout", "600s") \
    .set("spark.executor.heartbeatInterval", "100s")

sc = SparkContext(conf=conf)

# 你的 Spark 任务代码
rdd = sc.textFile("hdfs://path/to/your/file").repartition(100)
result = rdd.map(lambda x: x).collect()

print(result)
回复
likes
适合作为回答的
  • 经过验证的有效解决办法
  • 自己的经验指引,对解决问题有帮助
  • 遵循 Markdown 语法排版,代码语义正确
不该作为回答的
  • 询问内容细节或回复楼层
  • 与题目无关的内容
  • “赞”“顶”“同问”“看手册”“解决了没”等毫无意义的内容