查看原文
其他

Scala中的IO操作及ArrayBuffer线程安全问题

大数据学习与分享 大数据学习与分享 2022-07-09

通过Scala对文件进行读写操作在实际业务中应用也比较多,这里介绍几种常用的方式,直接上代码:

1.从文件中读取内容
object Main { def loadData(): Array[String] = { var bs: BufferedSource = null var in: InputStream = null try { in = Main.getClass.getClassLoader.getResourceAsStream("data.txt") if (in == null) { in = new FileInputStream(new File("data.txt")) } bs = new BufferedSource(in) bs.getLines().toArray } finally { bs.close() } } //直接通过scala.io.Source进行读取 def testSource(): Unit = { Source.fromFile("data.txt").foreach(println) }
}
2.向文件中写内容
def write(): Unit ={ //调用的就是java中的io类 val writer = new PrintWriter(new File("write.txt" )) writer.write("scala write") writer.close()}
除了上述读写方式,也可以从"屏幕"上读取用户输入的指令来处理程序:
import scala.io. StdIndef printIn(): Unit = { print("please enter number :") val line = StdIn.readLine() println(s"number is : $line")}

相信使用Scala进行应用开发时,ArrayBuffer是经常使用的数组。对ArrayBuffer进行新增元素时,通常使用方法:+=。但是该方法并非线程安全,如果在多线程环境使用该方法,由于并发问题,很容报索引越界异常。
下述模拟多线程向定义的ArrayBuffer中并发插入100个元素:
def arrBuffer(): Unit = { //默认初始容量为16 val arrayBuffer = new ArrayBuffer[Int]()
val executors = Executors.newFixedThreadPool(100)
for (i <- 1 to 100) { executors.execute(new Runnable { override def run(): Unit = { arrayBuffer += i } }) }
executors.shutdown() }

执行上述程序,报出类似如下的索引越界问题:

java.lang.ArrayIndexOutOfBoundsException: 32 at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85) at Main$$anonfun$main$1$$anon$1.run(Main.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
来看一下ArrayBuffer的+=实现源码:
//初始容量protected def initialSize: Int = 16//array默认长度为16protected var array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1))//元素个数,默认0protected var size0: Int = 0 def +=(elem: A): this.type = { ensureSize(size0 + 1) array(size0) = elem.asInstanceOf[AnyRef] size0 += 1 this}

val arrayBuffer = new ArrayBuffer[Int]():初始容量为16,并发情况下当array长度为16,但是size0已经大于16,并且array没有及时扩容时,就会报索引越界。

所以,在并发环境下,要注意调用该方法时的线程安全问题,比如利用synchronized做锁处理。

这里只是以ArrayBuffer为例,对于Scala中其他的集合使用时也要注意,防止类似问题的出现影响程序的正常运行。


关联文章:
学好Spark必须要掌握的Scala技术点

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存