From a6cd9ae918676cc3a8c9db2e1f4ba53d014016fc Mon Sep 17 00:00:00 2001 From: Cheetozz Date: Fri, 29 Mar 2019 13:52:26 +0300 Subject: [PATCH] Initial commit --- .gitignore | 19 ++++++ build.sbt | 29 +++++++++ concatenator.sh | 29 +++++++++ project/assembly.sbt | 1 + project/build.properties | 1 + src/main/resources/application.conf | 17 +++++ src/main/resources/log4j.properties | 27 ++++++++ .../scala/ru/sa2/hive/concatenator/Main.scala | 64 +++++++++++++++++++ .../concatenator/RemainingTasksMonitor.scala | 34 ++++++++++ .../concatenator/StatisticCalculator.scala | 37 +++++++++++ .../sa2/hive/concatenator/TablesFilter.scala | 23 +++++++ .../sa2/hive/concatenator/config/Config.scala | 27 ++++++++ .../hive/concatenator/core/Compressor.scala | 59 +++++++++++++++++ .../concatenator/core/PartitionTask.scala | 11 ++++ .../core/PartitionTaskGenerator.scala | 31 +++++++++ .../hive/concatenator/core/TableInfo.scala | 25 ++++++++ .../sa2/hive/concatenator/dao/DBService.scala | 28 ++++++++ .../sa2/hive/concatenator/dao/Executor.scala | 41 ++++++++++++ .../hive/concatenator/dao/HiveClient.scala | 19 ++++++ .../hive/concatenator/helper/HdfsHelper.scala | 53 +++++++++++++++ .../helper/HiveMetastoreHelper.scala | 55 ++++++++++++++++ src/test/resources/testconfig.conf | 20 ++++++ src/test/scala/ConfigTest.scala | 33 ++++++++++ .../hive/concatenator/HdfsHelperTest.scala | 41 ++++++++++++ .../hive/concatenator/TablesFilterTest.scala | 51 +++++++++++++++ 25 files changed, 775 insertions(+) create mode 100644 .gitignore create mode 100644 build.sbt create mode 100644 concatenator.sh create mode 100644 project/assembly.sbt create mode 100644 project/build.properties create mode 100644 src/main/resources/application.conf create mode 100644 src/main/resources/log4j.properties create mode 100644 src/main/scala/ru/sa2/hive/concatenator/Main.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/RemainingTasksMonitor.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/StatisticCalculator.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/config/Config.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/core/Compressor.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/core/PartitionTask.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/core/PartitionTaskGenerator.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/helper/HdfsHelper.scala create mode 100644 src/main/scala/ru/sa2/hive/concatenator/helper/HiveMetastoreHelper.scala create mode 100644 src/test/resources/testconfig.conf create mode 100644 src/test/scala/ConfigTest.scala create mode 100644 src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala create mode 100644 src/test/scala/ru/sa2/hive/concatenator/TablesFilterTest.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c7e27d --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +# Created by .ignore support plugin (hsz.mobi) +### SBT template +# Simple Build Tool +# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control + +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +.history +.cache +.lib/ +### Scala template +*.class +*.log + +.idea/* \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..6e10c34 --- /dev/null +++ b/build.sbt @@ -0,0 +1,29 @@ +name := "concatenator" + +version := "1.0" + +scalaVersion := "2.10.6" + +val sparkVersion = "1.6.1" +val main_class = "ru.sa2.hive.concatenator.Main" + + +libraryDependencies in ThisBuild ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVersion % Provided + ,"org.apache.spark" %% "spark-hive" % sparkVersion % Provided + ,"org.apache.hive" % "hive-jdbc" % "1.1.0" % Provided + ,"com.typesafe" % "config" % "0.3.1" //% Provided + ,"org.scalatest" %% "scalatest" % "3.0.0" % Test + ,"org.mockito" % "mockito-all" % "1.10.19" % Test +) +// +//assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) +// +//mainClass in assembly := Some(main_class) +// +//assemblyShadeRules in assembly := Seq( +// ShadeRule.rename("org.json4s.**" -> "shaded.org.json4s.@1").inAll, +// ShadeRule.rename("org.apache.http.**" -> "shaded.org.apache.http.@1").inAll +//) +// +//assemblyJarName in assembly := artifact.value.name + "-assembly-" + version.value + ".zip" diff --git a/concatenator.sh b/concatenator.sh new file mode 100644 index 0000000..969eec4 --- /dev/null +++ b/concatenator.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +pattern='-Dapp.tables.filter=db.table' +tasks='-Dapp.parallel.tasks=15' +hdfs='-Dapp.services.hdfs=hdfs://nameservice1:8020' +tables_owner='-Dapp.tables.owner=owner' +hive_extra_params='-Dapp.hiveopts.hive.output.file.extension=.snappy.parquet' + +nohup spark-submit --class ru.sa2.hive.concatenator.Main \ +--conf spark.driver.cores=1 --conf spark.driver.memory=16G \ +--conf spark.yarn.maxAppAttempts=1 \ +--conf spark.driver.extraJavaOptions="${pattern} ${tasks} ${hdfs} ${tables_owner} ${hive_extra_params}" \ +concatenator-assembly-1.0.jar > concatenator.logs + + +//////////////////////////////////V2 +#!/usr/bin/env bash + +pattern='-Dapp.tables.filter=^db.table' +tasks='-Dapp.parallel.tasks=2' +hdfs='-Dapp.services.hdfs=hdfs://nameservice1:8020' +hive_extra_params='-Dapp.hiveopts.0="hive.output.file.extension=.gz.parquet" -Dapp.hiveopts.1="mapred.job.queue.name=queue"' +tasks_filter_strategy='-Dapp.task.filter.strategy=default' + +nohup spark-submit --class ru.sa2.hive.concatenator.Main \ +--conf spark.driver.cores=1 --conf spark.driver.memory=16G \ +--conf spark.yarn.maxAppAttempts=1 \ +--conf spark.driver.extraJavaOptions="${pattern} ${tasks} ${tasks_filter_strategy} ${hdfs} ${hive_extra_params}" \ +concatenator-assembly-1.0.jar > concatenator_jbt.logs diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..39c1bb8 --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..826c0bd --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 0.13.16 \ No newline at end of file diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..2d6076e --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,17 @@ +app { + parallel { + tasks = "10" + } + tables { + filter = "some_hive_db.*\\..*" + } + db { + driver = "org.apache.hive.jdbc.HiveDriver" + url = "jdbc:hive2://hive_metastore:10000/default" + user = ${?USER} + } + services { + hive_metastore_uri = "thrift://hive_metastore:9083" + hdfs = "hdfs://nameservice1:8020" + } +} \ No newline at end of file diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..35a1d93 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# Root logger option +log4j.rootLogger=INFO, stdout, FILE + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + +#log4j.appender.FILE=org.apache.log4j.FileAppender + +## Set the name of the file +#log4j.appender.FILE.File=/home/username/concatenator.log +# +## Set the immediate flush to true (default) +#log4j.appender.FILE.ImmediateFlush=true +# +## Set the threshold to debug mode +#log4j.appender.FILE.Threshold=debug +# +## Set the append to false, overwrite +#log4j.appender.FILE.Append=true +# +## Define the layout for file appender +#log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +#log4j.appender.FILE.layout.conversionPattern=%m%n diff --git a/src/main/scala/ru/sa2/hive/concatenator/Main.scala b/src/main/scala/ru/sa2/hive/concatenator/Main.scala new file mode 100644 index 0000000..b54ff01 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/Main.scala @@ -0,0 +1,64 @@ +package ru.sa2.hive.concatenator + +import java.util.concurrent.{ExecutorService, Executors} + +import com.typesafe.config.ConfigFactory +import org.slf4j.LoggerFactory +import ru.sa2.hive.concatenator.core.{Compressor, PartitionTask, PartitionTaskGenerator} +import ru.sa2.hive.concatenator.helper.{HdfsHelper, HiveMetastoreHelper} + + +object Main extends App { + + val LOG = LoggerFactory.getLogger(this.getClass) + + implicit val config = ConfigFactory.load("application.conf") + + lazy val pool: ExecutorService = Executors.newFixedThreadPool(config.getInt("app.parallel.tasks")) + + val metastoreHelper = new HiveMetastoreHelper() + + Class.forName(config.getString("app.db.driver")) + + val tables = metastoreHelper.allTables(new TablesFilter(config.getString("app.tables.filter"))) + tables.map(_.name()).foreach(LOG.info) + + //Before optimizing + new StatisticCalculator(tables).printStatistics() + + val tasks = tables + .flatMap(table => new PartitionTaskGenerator(table).generatePartitionTasks()) + .filterNot(genFilter()) + + val results = tasks + .map { task => + LOG.info(s"Submitted task for ${task.table.name()} ${task.partition}") + pool.submit(new Compressor(task)) + } + + LOG.info(s"Number of tasks ${tasks.length}") + + val watchdog = new Thread(new RemainingTasksMonitor(results)) + watchdog.start() + watchdog.join() + + pool.shutdown() + + //After optimizing + new StatisticCalculator(tables).printStatistics() + + def genFilter(): PartitionTask => Boolean = { + val default: PartitionTask => Boolean = task => HdfsHelper.sufficientFileSize(task.path) + + if (config.hasPath("app.task.filter.strategy")) { + config.getString("app.task.filter.strategy") match { + case "force" => task: PartitionTask => false + case "default" => default + } + } + else { + default + } + + } +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/RemainingTasksMonitor.scala b/src/main/scala/ru/sa2/hive/concatenator/RemainingTasksMonitor.scala new file mode 100644 index 0000000..c9c1baa --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/RemainingTasksMonitor.scala @@ -0,0 +1,34 @@ +package ru.sa2.hive.concatenator + +import java.util.concurrent.{Future, TimeUnit} + +import org.slf4j.LoggerFactory +import scala.util.control.Breaks._ + + +class RemainingTasksMonitor(tasks: Seq[Future[_]]) extends Runnable { + + val LOG = LoggerFactory.getLogger(this.getClass) + + override def run(): Unit = { + + var previous = tasks.size + + breakable { + while (true) { + val remaining = tasks.count(!_.isDone) + + if (remaining == 0) break + if (remaining != previous) { + printInfo(remaining) + previous = remaining + } + TimeUnit.SECONDS.sleep(3) + } + } + + } + + def printInfo(n: Int) = LOG.info(s"Remaining $n tasks") + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/StatisticCalculator.scala b/src/main/scala/ru/sa2/hive/concatenator/StatisticCalculator.scala new file mode 100644 index 0000000..9b65b53 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/StatisticCalculator.scala @@ -0,0 +1,37 @@ +package ru.sa2.hive.concatenator + +import org.apache.hadoop.fs.Path +import org.slf4j.LoggerFactory +import ru.sa2.hive.concatenator.core.TableInfo +import ru.sa2.hive.concatenator.helper.HdfsHelper + + +class StatisticCalculator (tables: Seq[TableInfo]){ + + private val LOG = LoggerFactory.getLogger(this.getClass) + + def printStatistics() = { + tables.foreach{table => + val allFiles = table.listPartitions().flatMap{partition => + HdfsHelper.lengthsFilesIn(new Path(s"${table.rootLocation()}/$partition"))} + + LOG.info(s"Calculating stats for ${table.name()}") + LOG.info( + s""" + |Table: ${table.name()} + |Files: ${allFiles.length} + |MeanSize: ${meanFileSizeInTable(allFiles)} bytes + |----------------------------------------------------- + """.stripMargin) + + } + } + + private def meanFileSizeInTable(lengths: Seq[Long]) = { + if (lengths.nonEmpty) // TODO: change this shit to 'fold' + lengths.sum / lengths.length + else + 0L + } + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala b/src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala new file mode 100644 index 0000000..a890916 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala @@ -0,0 +1,23 @@ +package ru.sa2.hive.concatenator + +import org.slf4j.LoggerFactory +import ru.sa2.hive.concatenator.core.TableInfo + +class TablesFilter(filter: String) { + + val LOG = LoggerFactory.getLogger(getClass) + + val dbRegexps = filter.split(",").map(_.split('.')(0).r) + + LOG.info(s"filter: $filter") + LOG.info(s"regexp db: ${dbRegexps.mkString(", ")}") + + val tableRegexps = filter.split(",").map(_.r) + + LOG.info(s"regexp tbl: ${tableRegexps.mkString(", ")}") + + def validateDB(db: String): Boolean = dbRegexps.exists(_.findFirstIn(db).nonEmpty) + + def validateTable(table: TableInfo): Boolean = tableRegexps.exists(_.findFirstIn(table.name()).nonEmpty) + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/config/Config.scala b/src/main/scala/ru/sa2/hive/concatenator/config/Config.scala new file mode 100644 index 0000000..e8fbd8d --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/config/Config.scala @@ -0,0 +1,27 @@ +//package ru.sa2.hive.concatenator.config +// +//import com.typesafe.config.ConfigFactory +// +//import scala.util.Properties +// +//class Config(fileNameOption: Option[String] = None) { +// +// val inner = fileNameOption.fold( +// ifEmpty = ConfigFactory.load())( +// file => ConfigFactory.load(file)) +// +// def envOrConfig(name: String): String = { +// Properties.envOrElse( +// name.toUpperCase.replace("""\.""", "_"), +// inner.getString(name) +// ) +// } +// +// def propOrConfig(name: String): String = { +// Properties.propOrElse( +// name.toUpperCase.replace("""\.""", "_"), +// inner.getString(name) +// ) +// } +// +//} diff --git a/src/main/scala/ru/sa2/hive/concatenator/core/Compressor.scala b/src/main/scala/ru/sa2/hive/concatenator/core/Compressor.scala new file mode 100644 index 0000000..aacae38 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/core/Compressor.scala @@ -0,0 +1,59 @@ +package ru.sa2.hive.concatenator.core + +import java.util.concurrent.Callable + +import com.typesafe.config.Config +import org.slf4j.LoggerFactory +import ru.sa2.hive.concatenator.dao.DBService +import ru.sa2.hive.concatenator.helper.HdfsHelper +import scala.collection.JavaConverters._ + +class Compressor(task: PartitionTask)(implicit config: Config) extends Callable[Double] { + + val Log = LoggerFactory.getLogger(this.getClass) + + override def call(): Double = { + Log.info(s"Start compressing ${task.table.name()} ${task.partition}") + + lazy val connection = DBService.getConnectionForUser(getTableOwner()) + +// connection.setProp("hive.output.file.extension", ".gz.parquet") +// connection.setProp("hive.exec.dynamic.partition", "true") +// connection.setProp("hive.exec.dynamic.partition.mode", "nonstrict") + + getHiveConfigs().foreach{ case (key, value) => + Log.info(s"Set extra hive param: '$key' = '$value'") + connection.setProp(key, value) + } + + connection.setProp("hive.exec.dynamic.partition", "true") + connection.setProp("hive.exec.dynamic.partition.mode", "nonstrict") + connection.setProp("mapred.job.name", s"${task.generateJobName()}") + connection.setProp("mapreduce.job.reduces", HdfsHelper.numReduceTasks(task.path).toString) + connection.execUpdate(task.query) + + connection.close() + + Log.info(s"Compressed ${task.table.name()} ${task.partition}") + + val lengths = HdfsHelper.lengthsFilesIn(task.path).toList + lengths.sum / lengths.length + } + + private def getHiveConfigs(): Map[String, String] = { + config.getStringList("app.hiveopts").asScala + .map{ param => + val eq = param.indexOf("=") + param.take(eq) -> param.drop(eq + 1) + + }.toMap + } + + private def getTableOwner() ={ + if( config.hasPath("app.tables.owner") ) + config.getString("app.tables.owner") + else + task.table.owner() + } + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTask.scala b/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTask.scala new file mode 100644 index 0000000..f25261b --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTask.scala @@ -0,0 +1,11 @@ +package ru.sa2.hive.concatenator.core + +import org.apache.hadoop.fs.Path + +class PartitionTask(val table: TableInfo, val path: Path, val query: String, val partition: String) { + + def generateJobName(): String = { + s"Compressing ${table.name()} $partition" + } + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTaskGenerator.scala b/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTaskGenerator.scala new file mode 100644 index 0000000..26528d1 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/core/PartitionTaskGenerator.scala @@ -0,0 +1,31 @@ +package ru.sa2.hive.concatenator.core + +import org.apache.hadoop.fs.Path + +class PartitionTaskGenerator(table: TableInfo) { + + def generatePartitionTasks(): List[PartitionTask] = { + table.listPartitions().map(partition => + new PartitionTask( + table, + new Path(s"${table.rootLocation()}/$partition"), + generateQuery(partition), + partition + )) + } + + private def generatePartitionClause(parts: String): String = + parts.split("/").map(_.split("=").head).mkString(", ") + + private def generateWhereClause(parts: String): String = + parts.split("/").map(_.split("=")).map(a => s"${a(0)} = '${a(1)}'").mkString(" AND ") + + + private def generateQuery(partition: String): String = + if (table.isPartitioned()) + s"INSERT OVERWRITE TABLE ${table.name()} PARTITION (${generatePartitionClause(partition)}) SELECT DISTINCT * FROM ${table.name()} WHERE ${generateWhereClause(partition)}" + else + s"INSERT OVERWRITE TABLE ${table.name()} SELECT DISTINCT * FROM ${table.name()}" + + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala b/src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala new file mode 100644 index 0000000..2adf4c7 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala @@ -0,0 +1,25 @@ +package ru.sa2.hive.concatenator.core + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient +import org.apache.hadoop.hive.metastore.api.Table + +import scala.collection.JavaConverters._ + +class TableInfo(val table: Table, metastore: HiveMetaStoreClient) { + + + def name(): String = table.getDbName + "." + table.getTableName + + def isPartitioned(): Boolean = table.getPartitionKeysSize > 0 + + def listPartitions() = + if (isPartitioned()) + metastore.listPartitionNames(table.getDbName, table.getTableName, 0).asScala.toList + else + List("") + + def rootLocation() = table.getSd.getLocation.replaceAll("hdfs://nameservice1", "") + + def owner() = table.getOwner + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala b/src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala new file mode 100644 index 0000000..97c87a6 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala @@ -0,0 +1,28 @@ +package ru.sa2.hive.concatenator.dao + +import java.sql.{Connection, DriverManager} + +import com.typesafe.config.ConfigFactory +import org.slf4j.LoggerFactory + +import scala.util.Properties + + +object DBService { + private val LOG = LoggerFactory.getLogger(this.getClass) + private val config = ConfigFactory.load("application.conf") + + def apply(): DBService = new DBService(connection = getConnection()) + + def getConnectionForUser(user: String): HiveClient = { + LOG.info(s"Try to open connection for user: $user") + HiveClient(DriverManager.getConnection(config.getString("app.db.url"), user, "")) + } + + private def getConnection(): Connection = DriverManager.getConnection(config.getString("app.db.url"), config.getString("app.db.user"), "") +} + +class DBService(val connection: Connection) { + + def cleanup() = connection.close() +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala b/src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala new file mode 100644 index 0000000..f04865a --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala @@ -0,0 +1,41 @@ +package ru.sa2.hive.concatenator.dao + +import java.sql.{Connection, ResultSet} + +import org.slf4j.LoggerFactory + + +object Executor { + def apply(connection: Connection): Executor = new Executor(connection) +} + +class Executor(connection: Connection) { + + private val LOG = LoggerFactory.getLogger(this.getClass) + + def execUpdate(query: String): Int = { + LOG.debug(s"Running query:\t ${query}") + val stmt = connection.createStatement() + val updated = stmt.executeUpdate(query) + LOG.debug("Closing statement") + stmt.close() + updated + } + + def execQuery[T](query: String, f: ResultSet => T ): T = { + LOG.debug(s"Running query:\t $query") + val stmt = connection.createStatement() + val rs = stmt.executeQuery(query) + LOG.debug("Calculating result") + val value = f(rs) + try { + LOG.debug("Closing rs and statement") + rs.close() + stmt.close() + } catch { + case _: Exception => LOG.warn("Something bad on close resultset or statement") + } + value + } + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala b/src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala new file mode 100644 index 0000000..0be825f --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala @@ -0,0 +1,19 @@ +package ru.sa2.hive.concatenator.dao + +import java.sql.{Connection, ResultSet} + + +object HiveClient { + def apply(connection: Connection): HiveClient = new HiveClient(connection) +} + +class HiveClient(connection: Connection) { + + def setProp(key: String, value: String) = execUpdate(s"SET $key=$value") + + def execUpdate(query: String) = Executor(connection).execUpdate(query) + + def execQuery[T](query: String, f: ResultSet => T): T = Executor(connection).execQuery(query, f) + + def close() = connection.close() +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/helper/HdfsHelper.scala b/src/main/scala/ru/sa2/hive/concatenator/helper/HdfsHelper.scala new file mode 100644 index 0000000..9fb2688 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/helper/HdfsHelper.scala @@ -0,0 +1,53 @@ +package ru.sa2.hive.concatenator.helper + +import com.typesafe.config.ConfigFactory +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + + +object HdfsHelper { + + private val config = ConfigFactory.load("application.conf") + + val DEFAULT_BLOCK_SIZE = 256000000.0 + + lazy val hdfs = getHDFS() + + def isSufficientFileSize(summ: Long, mean: Double): Boolean = (mean > 200000000 && mean < 300000000) || // TODO: change summ of files length to files count + (summ < 256000000 && (mean == summ)) + + def sufficientFileSize(path: Path): Boolean = { + + if (hdfs.exists(path) && containFiles(path)) { + val lengths = lengthsFilesIn(path) + val summSize = lengths.sum + val meanSize = summSize / lengths.length + + isSufficientFileSize(summSize, meanSize) + } + else + true + + } + + private def getHDFS() = { + val conf = new Configuration() + conf.set("fs.defaultFS", config.getString("app.services.hdfs")) + FileSystem.get(conf) + } + + def numReduceTasks(path: Path): Int = blocksCount(lengthsFilesIn(path)) + + def blocksCount(lengths: Seq[Long]): Int = { + math.ceil(lengths.sum / DEFAULT_BLOCK_SIZE).toInt + } + + def lengthsFilesIn(path: Path) = + if (hdfs.exists(path)) + hdfs.listStatus(path).filter(_.isFile).filterNot(_.getPath.getName.contains("_SUCCESS")).map(_.getLen) + else + Array(0L) + + def containFiles(path: Path) = lengthsFilesIn(path).length != 0 + +} diff --git a/src/main/scala/ru/sa2/hive/concatenator/helper/HiveMetastoreHelper.scala b/src/main/scala/ru/sa2/hive/concatenator/helper/HiveMetastoreHelper.scala new file mode 100644 index 0000000..dcd1c70 --- /dev/null +++ b/src/main/scala/ru/sa2/hive/concatenator/helper/HiveMetastoreHelper.scala @@ -0,0 +1,55 @@ +package ru.sa2.hive.concatenator.helper + +import com.typesafe.config.Config +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.api.LockResponse +import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, LockComponentBuilder, LockRequestBuilder} +import ru.sa2.hive.concatenator.TablesFilter +import ru.sa2.hive.concatenator.core.TableInfo + +import scala.collection.JavaConverters._ + + +class HiveMetastoreHelper(implicit config: Config) { + + private val metastore = getHMC() + + def allDatabases = metastore.getAllDatabases.asScala + + def allTables(tablesFilter: TablesFilter) = allDatabases filter {tablesFilter.validateDB} flatMap getTables filter {tablesFilter.validateTable} + + def getTables(dbName: String): Seq[TableInfo] = + metastore.getTableObjectsByName(dbName, metastore.getAllTables(dbName)).asScala + .map(table => new TableInfo(table, metastore)) + + def lockResponse(table: TableInfo): LockResponse = { + val component = new LockComponentBuilder() + .setDbName(table.table.getDbName) + .setTableName(table.table.getTableName) + .setExclusive().build() + metastore.lock(new LockRequestBuilder().addLockComponent(component).setUser(table.owner()).build()) + } + + private def getHMC() = { + val conf = new HiveConf() + conf.setVar(HiveConf.ConfVars.METASTOREURIS, config.getString("app.services.hive_metastore_uri")) + new HiveMetaStoreClient(conf) + } + + def getTablePartitions(tableName: String)(implicit hmc: HiveMetaStoreClient): List[Map[String, String]] = { + + val Array(database, table) = tableName.split("\\.") //TODO add table name parser + val isPartitioned = hmc.getTable(database, table).getPartitionKeysSize > 0 + + require(isPartitioned, s"Table $tableName is not partitioned!") + + hmc.listPartitionNames(database, table, 0).asScala + .toList + .map(_.split("/") + .map(_.split("=")) + .map(a => a(0) -> a(1)).toMap + ) + } + + +} diff --git a/src/test/resources/testconfig.conf b/src/test/resources/testconfig.conf new file mode 100644 index 0000000..c97b086 --- /dev/null +++ b/src/test/resources/testconfig.conf @@ -0,0 +1,20 @@ +app { + parallel { + tasks = "10" + } + tables { + filter = "some_hive_db.*\\..*" + } + db { + driver = "org.apache.hive.jdbc.HiveDriver" + url = "jdbc:hive2://hive_metastore:10000/default" + user = "default" + user = ${?USER} + } + services { + hive_metastore_uri = "thrift://hive_metastore:9083" + hdfs = "hdfs://nameservice1:8020" + } + hiveopts = [ "hive.output.file.extension=.snappy.parquet" ] + +} \ No newline at end of file diff --git a/src/test/scala/ConfigTest.scala b/src/test/scala/ConfigTest.scala new file mode 100644 index 0000000..5bb3349 --- /dev/null +++ b/src/test/scala/ConfigTest.scala @@ -0,0 +1,33 @@ +package ololo + +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} + +import scala.collection.JavaConverters._ + +object ConfigTest extends App { + + val conf = ConfigFactory.load("testconfig.conf") + +// println(conf.getConfig("app.hiveopts")) + + println(conf.getStringList("app.hiveopts")) + + println() + + conf.getStringList("app.hiveopts").asScala + .map{ param => + + val eq = param.indexOf("=") + + println(param) + println(param.take(eq)) + println(param.drop(eq + 1)) + + param.take(eq) -> param.takeRight(eq) + + } + .foreach(println) + + + +} diff --git a/src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala b/src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala new file mode 100644 index 0000000..d24717b --- /dev/null +++ b/src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala @@ -0,0 +1,41 @@ +package ru.sa2.hive.concatenator + +import org.scalatest.{FunSpec, Matchers} +import ru.sa2.hive.concatenator.helper.HdfsHelper + +class HdfsHelperTest extends FunSpec with Matchers { + + describe("HdfsHelperTest") { + + it("should isEnoughFileSize") { + + HdfsHelper.isSufficientFileSize(800000000, 210000000) shouldBe true + HdfsHelper.isSufficientFileSize(200000000, 200000000) shouldBe true + HdfsHelper.isSufficientFileSize(26700000, 26700000) shouldBe true + + HdfsHelper.isSufficientFileSize(800000000, 800000000) shouldBe false + HdfsHelper.isSufficientFileSize(20000000, 7000000) shouldBe false + + HdfsHelper.isSufficientFileSize(200000000, 70000000) shouldBe false + + HdfsHelper.isSufficientFileSize(800000000, 150000000) shouldBe false + HdfsHelper.isSufficientFileSize(800000000, 400000000) shouldBe false + } + + it("should return correct number reducers") { + HdfsHelper.blocksCount(Seq(200000000)) shouldBe 1 + HdfsHelper.blocksCount(Seq(280000000)) shouldBe 2 + } + + // it("should generatePartitionClause") { + //// Main.generatePartitionClause("month=201707/first_id=38/second_id=7440") shouldBe "month = '201707', first_id = '38', second_id = '7440'" + // Main.generatePartitionClause("month=201707/first_id=38/second_id=7440") shouldBe "month, first_id, second_id" + // } + // + // it("should generateWhereClause") { + // Main.generateWhereClause("month=201707/first_id=38/second_id=7440") shouldBe "month = '201707' AND first_id = '38' AND second_id = '7440'" + // + // } + + } +} diff --git a/src/test/scala/ru/sa2/hive/concatenator/TablesFilterTest.scala b/src/test/scala/ru/sa2/hive/concatenator/TablesFilterTest.scala new file mode 100644 index 0000000..16170e3 --- /dev/null +++ b/src/test/scala/ru/sa2/hive/concatenator/TablesFilterTest.scala @@ -0,0 +1,51 @@ +package ru.sa2.hive.concatenator + +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient +import org.apache.hadoop.hive.metastore.api.Table +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSpec, Matchers} +import org.mockito.Mockito._ +import org.scalatest.mockito.MockitoSugar +import ru.sa2.hive.concatenator.core.TableInfo + + +class TablesFilterTest extends FunSpec with Matchers with MockitoSugar { + + val tablesFilter = new TablesFilter("tmp_.*\\..*ololo") + + describe("TablesFilterTest") { + + it("correct filtering databases") { + Set("tmp_astrn", "tmp_ololo", "asdad", "asdasdasd", "tmpsdjhfsjdfh") + .filter(tablesFilter.validateDB) shouldBe Set("tmp_astrn", "tmp_ololo") + } + + it("correct filtering tables") { + val input = Seq("tmp_astrn.sdfsdf", "tmp_ololo.sdd", "tmp_ololo.ololo", "tmp_ololo.sddololo") + val answer = Seq("tmp_ololo.ololo", "tmp_ololo.sddololo") + + validateTables(input, tablesFilter) shouldBe answer + } + + it("correct filtering tables max") { + val filter = new TablesFilter("some_db.?\\.(this.*|.*or_this.*)") + val input = Seq("some_db.thisx_one", "some_db.this_two", "some_db.or_this", "some_db.qqqor_this", "some_db.qqqor_thisqqq", + "some_db.aaa", "some_db.bbb", "some_db.ololo") + val answer = Seq("some_db.thisx_one", "some_db.this_two", "some_db.or_this", "some_db.qqqor_this", "some_db.qqqor_thisqqq") + + validateTables(input, filter) shouldBe answer + } + } + + + def validateTables(tables: Seq[String], filter: TablesFilter): Seq[String] = tables + .map(name => mockTableInfo(name)) + .filter(filter.validateTable) + .map(_.name()) + + + def mockTableInfo(answer: String) = { + val ti = mock[TableInfo] + when(ti.name()).thenReturn(answer) + ti + } +}