Spark- RDD持久化

 

官方原文:

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing aStorageLevel object (ScalaJavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
  • In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
    • It allows multiple executors to share the same pool of memory in Tachyon.
    • It significantly reduces garbage collection costs.
    • Cached data is not lost if individual executors crash.

 

RDD持久性

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用persist()cache()方法标记要保留的RDD 。第一次在动作中计算它时,它将保留在节点的内存中。Spark的缓存是容错的 – 如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化RDD可以使用不同的存储级别进行存储,例如,允许您将数据集保留在磁盘上,将其保留在内存中,但作为序列化Java对象(以节省空间),跨节点复制或存储它在Tachyon堆满了。通过传递StorageLevel对象(Scala, Java, Python)来设置这些级别 persist()。该cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。完整的存储级别是:

存储级别 含义
MEMORY_ONLY 将RDD存储为JVM中的反序列化Java对象。如果RDD不适合内存,则某些分区将不会被缓存,并且每次需要时都会重新计算。这是默认级别。
MEMORY_AND_DISK 将RDD存储为JVM中的反序列化Java对象。如果RDD不适合内存,请存储不适合磁盘的分区,并在需要时从那里读取它们。
MEMORY_ONLY_SER 将RDD存储为序列化 Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化器时,但读取CPU密集程度更高。
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER类似,但将不适合内存的分区溢出到磁盘,而不是每次需要时即时重新计算它们。
DISK_ONLY 仅将RDD分区存储在磁盘上。
MEMORY_ONLY_2,MEMORY_AND_DISK_2等 与上面的级别相同,但复制两个群集节点上的每个分区。
OFF_HEAP(实验性) Tachyon中以序列化格式存储RDD 。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾收集开销,并允许执行器更小并共享内存池,使其在具有大堆或多个并发应用程序的环境中具有吸引力。此外,由于RDD驻留在Tachyon中,执行程序的崩溃不会导致丢失内存缓存。在这种模式下,Tachyon中的内存是可丢弃的。因此,Tachyon不会尝试重建一个从记忆中驱逐的块。如果您打算使用Tachyon作为off堆存储,Spark可以与开箱即用的Tachyon兼容。 有关建议的版本配对,请参阅此页面

注意: 在Python中,存储对象将始终使用Pickle库进行序列化,因此选择序列化级别无关紧要。

reduceByKey即使没有用户调用,Spark也会在shuffle操作(例如)中自动保留一些中间数据persist。这样做是为了避免在shuffle期间节点出现故障时重新计算整个输入。我们仍然建议用户persist在计划重用RDD时调用生成的RDD。

选择哪种存储级别?

Spark的存储级别旨在提供内存使用和CPU效率之间的不同折衷。我们建议您通过以下流程选择一个:

  • 如果您的RDD与默认存储级别(MEMORY_ONLY)保持一致,请保持这种状态。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
  • 如果没有,请尝试使用MEMORY_ONLY_SER选择快速序列化库,以使对象更加节省空间,但仍然可以快速访问。
  • 除非计算数据集的函数很昂贵,否则它们不会溢出到磁盘,或者它们会过滤大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果要快速故障恢复,请使用复制的存储级别(例如,如果使用Spark来处理来自Web应用程序的请求)。所有存储级别通过重新计算丢失的数据提供完全容错,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。
  • 在具有大量内存或多个应用程序的环境中,实验OFF_HEAP 模式有几个优点:
    • 它允许多个执行程序在Tachyon中共享相同的内存池。
    • 它显着降低了垃圾收集成本。
    • 如果个别执行程序崩溃,缓存数据不会丢失。

 

复制代码
package cn.rzlee.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @Author ^_^
 * @Create 2018/11/3
 */
public class Persist {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("Persist").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("C:\\Users\\txdyl\\Desktop\\log\\in\\data.txt", 1).cache();
        long beginTime = System.currentTimeMillis();
        long count = lines.count();
        System.out.println(count);
        long endTime = System.currentTimeMillis();
        System.out.println("cost "+(endTime - beginTime) + "millisecond");


        beginTime = System.currentTimeMillis();
        count = lines.count();
        System.out.println(count);
        endTime = System.currentTimeMillis();
        System.out.println("cost "+(endTime - beginTime) + "millisecond");

        sc.close();
    }
}
复制代码

 

什么时候进行 cache?

1.要求的计算速度快
2.集群的资源要足够大
3.重要: cache的数据会多次的触发 Action
4.先进行过滤,然后将缩小范围的数据在 cache到内存

//第一个参数,放到磁盘
//第二个参数,放到内存
//第三个参数,磁盘中的数据,不是以java对象的方式保存
//第四个参数,内存中的数据,以java对象的方式保存

val MEMORY_AND_DISK = new StorageLevel(true true, false, true)
0FF_HEAP:堆外内存( Tachyon,分布式内存存储系统)
Alluxio-Open Source Memory Speed Virtual Distributed Storage

 

cache使用unpersist(true)释放掉缓存在内存中的数据。

什么时候做 checkpoint?

1.迭代计算,要求保证数据安全
2.对速度要求不高(跟 cache到内存进行对比)
3.将中间结果保存到hdfs

步骤:
//设置 checkpoint目录(分布式文件系统的目录hdfs目录)
//经过复杂进行,得到中间结果
//将中间结果 checkpoint到指定的hdfs目录
//后续的计算,就可以使用前面ck的数据了

 

 

为TA充电
共{{data.count}}人
人已赞赏
博客大数据

Spark- Action实战

2019-8-18 17:46:40

博客大数据

Spark- 共享变量

2019-8-18 17:50:21

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索