uncategorized

spark basic 1

date: 2018-12-19 18:01:17

spark java code example

๋กœ์ปฌ ๋ชจ๋“œ๋กœ ์‹คํ–‰๋œ ์ŠคํŒŒํฌ๋กœ csv๋ฅผ ์ฝ๊ณ , ํ•„ํ„ฐ๋งํ•˜์—ฌ ์ •์ œ๋œ ๋ฐ์ดํ„ฐ๋ฅผ DB์— ๋„ฃ๋Š” ์˜ˆ์ œ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package chapter1;

import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;

import java.util.Properties;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

public class Application {

public static void main(String args[]) throws InterruptedException {

// Create a session
SparkSession spark = new SparkSession.Builder()
.appName("csv to db")
.master("local")
.getOrCreate();

// get data
Dataset<Row> df = spark.read().format("csv")
.option("header", true)
.load("src/main/resources/name_and_comments.txt");

df.show(3);

// transformation
df = df.withColumn("full_name",
concat(df.col("last_name"), lit(", "), df.col("first_name")))
.filter(df.col("comment").rlike("\\d+"))
.orderBy(df.col("last_name").asc());

df.show(3);

// Write to destination
String dbConnectionUrl = "jdbc:mysql://127.0.0.1:3306/testdb";
Properties prop = new Properties();
prop.setProperty("driver", "com.mysql.jdbc.Driver");
prop.setProperty("user", "testuser");
prop.setProperty("password", "password");

df.write()
.mode(SaveMode.Overwrite)
.jdbc(dbConnectionUrl, "project1", prop);

}
}

DB์— ์ž๋™์œผ๋กœ ํ…Œ์ด๋ธ”์ด ์ƒ์„ฑ๋˜์–ด, ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ.

1
2
3
4
5
6
CREATE TABLE `project1` (
`last_name` text COLLATE utf8mb4_bin,
`first_name` text COLLATE utf8mb4_bin,
`comment` text COLLATE utf8mb4_bin,
`full_name` text COLLATE utf8mb4_bin
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

๊ด€๋ จ git code

tags:

Share