Пусть Жираф был не прав,
Но виновен не Жираф,
А тот, кто крикнул из ветвей:
«Жираф большой — ему видней!» (с)
Потребовалось оперативно разобраться с технологией Apache Spark заточенную для использования Big Data. В процессе выяснения активно использовал habrahabr, так что попробую вернуть информационный должок, поделившись приобретенным опытом.
А именно: установкой системы с нуля, настройкой и собственно программированием кода решающего задачу обработки данных для создания модели, вычисляющей вероятность банкротства клиента банка по набору таких признаков как сумма кредита, ставка и т.д.
Больших данных вроде как должно быть много, но почему-то не просто найти то злачное место, где их все щупают. Сначала попробовал вариант с ambari, но на моей Window7 валились ошибки настроек сетевого моста. В итоге прокатил вариант с преднастроенной виртуальной машиной от Cloudera (CDH). Просто устанавливаем VirtualBox, запускаем скачанный файл, указываем основные параметры (память, место) и через 5 минут достопочтенный джин Apache Hadoop жаждет ваших указаний.
Несколько слов, почему именно Spark. Насколько я понимаю, ключевые отличия от изначальной MapReduce в том, что данные удерживаются в памяти, вместо сброса на диск, что дает ускорение во много раз. Но, пожалуй, более важны реализации целого ряда статистических функций и удобным интерфейсом для загрузки/обработки данных.
Дальше собственно код для решения следующей задачи. Есть реально большие данные (ибо рука очень устает скролить эти 2000 строк) в формате:
Есть предположение, что дефолт как-то связан с остальными параметрами (кроме первого, к уважаемым Ивановым1…N претензий нет) и нужно построить модель линейной регрессии. Прежде чем начать, стоит оговориться, что это мой первый код на Java, сам я работаю аналитиком и вообще это мой первый запуск Eclipse, настройка Maven и т.д. Так что не стоит ждать изысканных чудес, ниже решение задачи в лоб тем способом, который почему-то заработал. Поехали:
1. Создаем Spark сессию. Важный момент – это все работает только с версии 2.0.0, тогда как в поставке CDH идет v1.6. Так что нужно сделать апгрейд, иначе будет исключение при запуске.
SparkSession ss = SparkSession
.builder()
.appName("Bankrupticy analyser")
.getOrCreate();
JavaRDD<Client> peopleRDD = ss.read()
.textFile(filename)
.javaRDD()
.map(new Function<String, Client>()
{
public Client call(String line) throws Exception
{
String[] parts = line.split(","); // Разделитель
Client client = new Client();
client.setName(parts[0]); // Парсим поля (ФИО в первой колонке)
client.setYearOfBirth(Double.parseDouble(parts[1]));
client.setAmount(Double.parseDouble(parts[2]));
client.setTerm(Double.parseDouble(parts[3]));
client.setRate(Double.parseDouble(parts[4]));
client.setPaid(Double.parseDouble(parts[5]));
client.setStatus(Double.parseDouble(parts[6])); // Тут признак банкротства (1 - банкрот, 0 – пока еще платит)
return client;
}
});
Dataset<Row> clientDF = ss.createDataFrame(peopleRDD, Client.class);
clientDF.createOrReplaceTempView("client");
Dataset<Row> scaledData = ss.sql(
"SELECT name, (minYearOfBirth - yearOfBirth) / (minYearOfBirth - maxYearOfBirth),"
+ "(minAmount - amount) / (minAmount - maxAmount),"
+ "(minTerm - term) / (minTerm - maxTerm),"
+ "(minRate - rate) / (minRate - maxRate),"
+ "(minPaid - paid) / (minPaid - maxPaid),"
+ "(minStatus - status) / (minStatus - maxStatus) "
+ "FROM client CROSS JOIN "
+ "(SELECT min(yearOfBirth) AS minYearOfBirth, max(yearOfBirth) AS maxYearOfBirth,"
+ "min(amount) AS minAmount, max(amount) AS maxAmount,"
+ "min(term) AS minTerm , max(term) AS maxTerm,"
+ "min(rate) AS minRate, max(rate) AS maxRate,"
+ "min(paid) AS minPaid, max(paid) AS maxPaid,"
+ "min(status) AS minStatus, max(status) AS maxStatus "
+ "FROM client)").cache();
JavaRDD<Row> rowData = scaledData.javaRDD(); // Dataset to JavaRDD
JavaRDD<Tuple2<String,LabeledPoint>> parsedData = rowData.map(
new Function<Row, Tuple2<String,LabeledPoint>>()
{
public Tuple2<String,LabeledPoint> call(Row row)
{
int last = row.length();
String cname = row.getString(0); // Первый элемент - ФИО
double label = row.getDouble(last - 1); // Последний – признак дефолта
double[] v = new double[last];
for (int i = 1; i < last - 1; i++) // Посередине независимые переменные
v[i] = row.getDouble(i);
v[last - 1] = 1; // +intercept
return new Tuple2<String, LabeledPoint>
(cname, new LabeledPoint(label, Vectors.dense(v)));
}
});
JavaRDD<LabeledPoint> parsedDataToTrain = parsedData.map(
new Function<Tuple2<String,LabeledPoint>, LabeledPoint>()
{
public LabeledPoint call(Tuple2<String,LabeledPoint> namedTuple)
{
return namedTuple._2(); // 2 означает второй элемент в составе <String,LabeledPoint>
}
});
parsedData.cache();
int numIterations = 200;
double stepSize = 2;
final LinearRegressionModel model
= LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedDataToTrain), numIterations, stepSize);
final NumberFormat nf = NumberFormat.getInstance(); // Для красоты вывода чисел
nf.setMaximumFractionDigits(2);
JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
new Function<Tuple2<String,LabeledPoint>, Tuple2<Double, Double>>()
{
public Tuple2<Double, Double> call(Tuple2<String,LabeledPoint> namedTuple)
{
double prediction = model.predict(namedTuple._2().features()); // Расчет зависимой переменной для набора признаков данного клиента
System.out.println(namedTuple._1() + " got the score " + nf.format(prediction)
+ ". The real status is " + nf.format(namedTuple._2().label()));
return new Tuple2<Double, Double>(prediction, namedTuple._2().label());
}
});
double MSE = new JavaDoubleRDD(valuesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>()
{
public Object call(Tuple2<Double, Double> pair)
{
return Math.pow(pair._1() - pair._2(), 2.0);
}
}).rdd()).mean();
К сожалению, не доступен сервер mySQL