Initial commit
This commit is contained in:
19
.gitignore
vendored
Normal file
19
.gitignore
vendored
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# Created by .ignore support plugin (hsz.mobi)
|
||||||
|
### SBT template
|
||||||
|
# Simple Build Tool
|
||||||
|
# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control
|
||||||
|
|
||||||
|
dist/*
|
||||||
|
target/
|
||||||
|
lib_managed/
|
||||||
|
src_managed/
|
||||||
|
project/boot/
|
||||||
|
project/plugins/project/
|
||||||
|
.history
|
||||||
|
.cache
|
||||||
|
.lib/
|
||||||
|
### Scala template
|
||||||
|
*.class
|
||||||
|
*.log
|
||||||
|
|
||||||
|
.idea/*
|
||||||
29
build.sbt
Normal file
29
build.sbt
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
name := "concatenator"
|
||||||
|
|
||||||
|
version := "1.0"
|
||||||
|
|
||||||
|
scalaVersion := "2.10.6"
|
||||||
|
|
||||||
|
val sparkVersion = "1.6.1"
|
||||||
|
val main_class = "ru.sa2.hive.concatenator.Main"
|
||||||
|
|
||||||
|
|
||||||
|
libraryDependencies in ThisBuild ++= Seq(
|
||||||
|
"org.apache.spark" %% "spark-core" % sparkVersion % Provided
|
||||||
|
,"org.apache.spark" %% "spark-hive" % sparkVersion % Provided
|
||||||
|
,"org.apache.hive" % "hive-jdbc" % "1.1.0" % Provided
|
||||||
|
,"com.typesafe" % "config" % "0.3.1" //% Provided
|
||||||
|
,"org.scalatest" %% "scalatest" % "3.0.0" % Test
|
||||||
|
,"org.mockito" % "mockito-all" % "1.10.19" % Test
|
||||||
|
)
|
||||||
|
//
|
||||||
|
//assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
|
||||||
|
//
|
||||||
|
//mainClass in assembly := Some(main_class)
|
||||||
|
//
|
||||||
|
//assemblyShadeRules in assembly := Seq(
|
||||||
|
// ShadeRule.rename("org.json4s.**" -> "shaded.org.json4s.@1").inAll,
|
||||||
|
// ShadeRule.rename("org.apache.http.**" -> "shaded.org.apache.http.@1").inAll
|
||||||
|
//)
|
||||||
|
//
|
||||||
|
//assemblyJarName in assembly := artifact.value.name + "-assembly-" + version.value + ".zip"
|
||||||
29
concatenator.sh
Normal file
29
concatenator.sh
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
pattern='-Dapp.tables.filter=db.table'
|
||||||
|
tasks='-Dapp.parallel.tasks=15'
|
||||||
|
hdfs='-Dapp.services.hdfs=hdfs://nameservice1:8020'
|
||||||
|
tables_owner='-Dapp.tables.owner=owner'
|
||||||
|
hive_extra_params='-Dapp.hiveopts.hive.output.file.extension=.snappy.parquet'
|
||||||
|
|
||||||
|
nohup spark-submit --class ru.sa2.hive.concatenator.Main \
|
||||||
|
--conf spark.driver.cores=1 --conf spark.driver.memory=16G \
|
||||||
|
--conf spark.yarn.maxAppAttempts=1 \
|
||||||
|
--conf spark.driver.extraJavaOptions="${pattern} ${tasks} ${hdfs} ${tables_owner} ${hive_extra_params}" \
|
||||||
|
concatenator-assembly-1.0.jar > concatenator.logs
|
||||||
|
|
||||||
|
|
||||||
|
//////////////////////////////////V2
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
pattern='-Dapp.tables.filter=^db.table'
|
||||||
|
tasks='-Dapp.parallel.tasks=2'
|
||||||
|
hdfs='-Dapp.services.hdfs=hdfs://nameservice1:8020'
|
||||||
|
hive_extra_params='-Dapp.hiveopts.0="hive.output.file.extension=.gz.parquet" -Dapp.hiveopts.1="mapred.job.queue.name=queue"'
|
||||||
|
tasks_filter_strategy='-Dapp.task.filter.strategy=default'
|
||||||
|
|
||||||
|
nohup spark-submit --class ru.sa2.hive.concatenator.Main \
|
||||||
|
--conf spark.driver.cores=1 --conf spark.driver.memory=16G \
|
||||||
|
--conf spark.yarn.maxAppAttempts=1 \
|
||||||
|
--conf spark.driver.extraJavaOptions="${pattern} ${tasks} ${tasks_filter_strategy} ${hdfs} ${hive_extra_params}" \
|
||||||
|
concatenator-assembly-1.0.jar > concatenator_jbt.logs
|
||||||
1
project/assembly.sbt
Normal file
1
project/assembly.sbt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
|
||||||
1
project/build.properties
Normal file
1
project/build.properties
Normal file
@@ -0,0 +1 @@
|
|||||||
|
sbt.version = 0.13.16
|
||||||
17
src/main/resources/application.conf
Normal file
17
src/main/resources/application.conf
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
app {
|
||||||
|
parallel {
|
||||||
|
tasks = "10"
|
||||||
|
}
|
||||||
|
tables {
|
||||||
|
filter = "some_hive_db.*\\..*"
|
||||||
|
}
|
||||||
|
db {
|
||||||
|
driver = "org.apache.hive.jdbc.HiveDriver"
|
||||||
|
url = "jdbc:hive2://hive_metastore:10000/default"
|
||||||
|
user = ${?USER}
|
||||||
|
}
|
||||||
|
services {
|
||||||
|
hive_metastore_uri = "thrift://hive_metastore:9083"
|
||||||
|
hdfs = "hdfs://nameservice1:8020"
|
||||||
|
}
|
||||||
|
}
|
||||||
27
src/main/resources/log4j.properties
Normal file
27
src/main/resources/log4j.properties
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
# Root logger option
|
||||||
|
log4j.rootLogger=INFO, stdout, FILE
|
||||||
|
|
||||||
|
# Direct log messages to stdout
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.Target=System.out
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
|
||||||
|
|
||||||
|
|
||||||
|
#log4j.appender.FILE=org.apache.log4j.FileAppender
|
||||||
|
|
||||||
|
## Set the name of the file
|
||||||
|
#log4j.appender.FILE.File=/home/username/concatenator.log
|
||||||
|
#
|
||||||
|
## Set the immediate flush to true (default)
|
||||||
|
#log4j.appender.FILE.ImmediateFlush=true
|
||||||
|
#
|
||||||
|
## Set the threshold to debug mode
|
||||||
|
#log4j.appender.FILE.Threshold=debug
|
||||||
|
#
|
||||||
|
## Set the append to false, overwrite
|
||||||
|
#log4j.appender.FILE.Append=true
|
||||||
|
#
|
||||||
|
## Define the layout for file appender
|
||||||
|
#log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
|
||||||
|
#log4j.appender.FILE.layout.conversionPattern=%m%n
|
||||||
64
src/main/scala/ru/sa2/hive/concatenator/Main.scala
Normal file
64
src/main/scala/ru/sa2/hive/concatenator/Main.scala
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import java.util.concurrent.{ExecutorService, Executors}
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import ru.sa2.hive.concatenator.core.{Compressor, PartitionTask, PartitionTaskGenerator}
|
||||||
|
import ru.sa2.hive.concatenator.helper.{HdfsHelper, HiveMetastoreHelper}
|
||||||
|
|
||||||
|
|
||||||
|
object Main extends App {
|
||||||
|
|
||||||
|
val LOG = LoggerFactory.getLogger(this.getClass)
|
||||||
|
|
||||||
|
implicit val config = ConfigFactory.load("application.conf")
|
||||||
|
|
||||||
|
lazy val pool: ExecutorService = Executors.newFixedThreadPool(config.getInt("app.parallel.tasks"))
|
||||||
|
|
||||||
|
val metastoreHelper = new HiveMetastoreHelper()
|
||||||
|
|
||||||
|
Class.forName(config.getString("app.db.driver"))
|
||||||
|
|
||||||
|
val tables = metastoreHelper.allTables(new TablesFilter(config.getString("app.tables.filter")))
|
||||||
|
tables.map(_.name()).foreach(LOG.info)
|
||||||
|
|
||||||
|
//Before optimizing
|
||||||
|
new StatisticCalculator(tables).printStatistics()
|
||||||
|
|
||||||
|
val tasks = tables
|
||||||
|
.flatMap(table => new PartitionTaskGenerator(table).generatePartitionTasks())
|
||||||
|
.filterNot(genFilter())
|
||||||
|
|
||||||
|
val results = tasks
|
||||||
|
.map { task =>
|
||||||
|
LOG.info(s"Submitted task for ${task.table.name()} ${task.partition}")
|
||||||
|
pool.submit(new Compressor(task))
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(s"Number of tasks ${tasks.length}")
|
||||||
|
|
||||||
|
val watchdog = new Thread(new RemainingTasksMonitor(results))
|
||||||
|
watchdog.start()
|
||||||
|
watchdog.join()
|
||||||
|
|
||||||
|
pool.shutdown()
|
||||||
|
|
||||||
|
//After optimizing
|
||||||
|
new StatisticCalculator(tables).printStatistics()
|
||||||
|
|
||||||
|
def genFilter(): PartitionTask => Boolean = {
|
||||||
|
val default: PartitionTask => Boolean = task => HdfsHelper.sufficientFileSize(task.path)
|
||||||
|
|
||||||
|
if (config.hasPath("app.task.filter.strategy")) {
|
||||||
|
config.getString("app.task.filter.strategy") match {
|
||||||
|
case "force" => task: PartitionTask => false
|
||||||
|
case "default" => default
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
default
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import java.util.concurrent.{Future, TimeUnit}
|
||||||
|
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import scala.util.control.Breaks._
|
||||||
|
|
||||||
|
|
||||||
|
class RemainingTasksMonitor(tasks: Seq[Future[_]]) extends Runnable {
|
||||||
|
|
||||||
|
val LOG = LoggerFactory.getLogger(this.getClass)
|
||||||
|
|
||||||
|
override def run(): Unit = {
|
||||||
|
|
||||||
|
var previous = tasks.size
|
||||||
|
|
||||||
|
breakable {
|
||||||
|
while (true) {
|
||||||
|
val remaining = tasks.count(!_.isDone)
|
||||||
|
|
||||||
|
if (remaining == 0) break
|
||||||
|
if (remaining != previous) {
|
||||||
|
printInfo(remaining)
|
||||||
|
previous = remaining
|
||||||
|
}
|
||||||
|
TimeUnit.SECONDS.sleep(3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def printInfo(n: Int) = LOG.info(s"Remaining $n tasks")
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import ru.sa2.hive.concatenator.core.TableInfo
|
||||||
|
import ru.sa2.hive.concatenator.helper.HdfsHelper
|
||||||
|
|
||||||
|
|
||||||
|
class StatisticCalculator (tables: Seq[TableInfo]){
|
||||||
|
|
||||||
|
private val LOG = LoggerFactory.getLogger(this.getClass)
|
||||||
|
|
||||||
|
def printStatistics() = {
|
||||||
|
tables.foreach{table =>
|
||||||
|
val allFiles = table.listPartitions().flatMap{partition =>
|
||||||
|
HdfsHelper.lengthsFilesIn(new Path(s"${table.rootLocation()}/$partition"))}
|
||||||
|
|
||||||
|
LOG.info(s"Calculating stats for ${table.name()}")
|
||||||
|
LOG.info(
|
||||||
|
s"""
|
||||||
|
|Table: ${table.name()}
|
||||||
|
|Files: ${allFiles.length}
|
||||||
|
|MeanSize: ${meanFileSizeInTable(allFiles)} bytes
|
||||||
|
|-----------------------------------------------------
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def meanFileSizeInTable(lengths: Seq[Long]) = {
|
||||||
|
if (lengths.nonEmpty) // TODO: change this shit to 'fold'
|
||||||
|
lengths.sum / lengths.length
|
||||||
|
else
|
||||||
|
0L
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
23
src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala
Normal file
23
src/main/scala/ru/sa2/hive/concatenator/TablesFilter.scala
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import ru.sa2.hive.concatenator.core.TableInfo
|
||||||
|
|
||||||
|
class TablesFilter(filter: String) {
|
||||||
|
|
||||||
|
val LOG = LoggerFactory.getLogger(getClass)
|
||||||
|
|
||||||
|
val dbRegexps = filter.split(",").map(_.split('.')(0).r)
|
||||||
|
|
||||||
|
LOG.info(s"filter: $filter")
|
||||||
|
LOG.info(s"regexp db: ${dbRegexps.mkString(", ")}")
|
||||||
|
|
||||||
|
val tableRegexps = filter.split(",").map(_.r)
|
||||||
|
|
||||||
|
LOG.info(s"regexp tbl: ${tableRegexps.mkString(", ")}")
|
||||||
|
|
||||||
|
def validateDB(db: String): Boolean = dbRegexps.exists(_.findFirstIn(db).nonEmpty)
|
||||||
|
|
||||||
|
def validateTable(table: TableInfo): Boolean = tableRegexps.exists(_.findFirstIn(table.name()).nonEmpty)
|
||||||
|
|
||||||
|
}
|
||||||
27
src/main/scala/ru/sa2/hive/concatenator/config/Config.scala
Normal file
27
src/main/scala/ru/sa2/hive/concatenator/config/Config.scala
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
//package ru.sa2.hive.concatenator.config
|
||||||
|
//
|
||||||
|
//import com.typesafe.config.ConfigFactory
|
||||||
|
//
|
||||||
|
//import scala.util.Properties
|
||||||
|
//
|
||||||
|
//class Config(fileNameOption: Option[String] = None) {
|
||||||
|
//
|
||||||
|
// val inner = fileNameOption.fold(
|
||||||
|
// ifEmpty = ConfigFactory.load())(
|
||||||
|
// file => ConfigFactory.load(file))
|
||||||
|
//
|
||||||
|
// def envOrConfig(name: String): String = {
|
||||||
|
// Properties.envOrElse(
|
||||||
|
// name.toUpperCase.replace("""\.""", "_"),
|
||||||
|
// inner.getString(name)
|
||||||
|
// )
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// def propOrConfig(name: String): String = {
|
||||||
|
// Properties.propOrElse(
|
||||||
|
// name.toUpperCase.replace("""\.""", "_"),
|
||||||
|
// inner.getString(name)
|
||||||
|
// )
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//}
|
||||||
@@ -0,0 +1,59 @@
|
|||||||
|
package ru.sa2.hive.concatenator.core
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable
|
||||||
|
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
import ru.sa2.hive.concatenator.dao.DBService
|
||||||
|
import ru.sa2.hive.concatenator.helper.HdfsHelper
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
class Compressor(task: PartitionTask)(implicit config: Config) extends Callable[Double] {
|
||||||
|
|
||||||
|
val Log = LoggerFactory.getLogger(this.getClass)
|
||||||
|
|
||||||
|
override def call(): Double = {
|
||||||
|
Log.info(s"Start compressing ${task.table.name()} ${task.partition}")
|
||||||
|
|
||||||
|
lazy val connection = DBService.getConnectionForUser(getTableOwner())
|
||||||
|
|
||||||
|
// connection.setProp("hive.output.file.extension", ".gz.parquet")
|
||||||
|
// connection.setProp("hive.exec.dynamic.partition", "true")
|
||||||
|
// connection.setProp("hive.exec.dynamic.partition.mode", "nonstrict")
|
||||||
|
|
||||||
|
getHiveConfigs().foreach{ case (key, value) =>
|
||||||
|
Log.info(s"Set extra hive param: '$key' = '$value'")
|
||||||
|
connection.setProp(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.setProp("hive.exec.dynamic.partition", "true")
|
||||||
|
connection.setProp("hive.exec.dynamic.partition.mode", "nonstrict")
|
||||||
|
connection.setProp("mapred.job.name", s"${task.generateJobName()}")
|
||||||
|
connection.setProp("mapreduce.job.reduces", HdfsHelper.numReduceTasks(task.path).toString)
|
||||||
|
connection.execUpdate(task.query)
|
||||||
|
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
Log.info(s"Compressed ${task.table.name()} ${task.partition}")
|
||||||
|
|
||||||
|
val lengths = HdfsHelper.lengthsFilesIn(task.path).toList
|
||||||
|
lengths.sum / lengths.length
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getHiveConfigs(): Map[String, String] = {
|
||||||
|
config.getStringList("app.hiveopts").asScala
|
||||||
|
.map{ param =>
|
||||||
|
val eq = param.indexOf("=")
|
||||||
|
param.take(eq) -> param.drop(eq + 1)
|
||||||
|
|
||||||
|
}.toMap
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getTableOwner() ={
|
||||||
|
if( config.hasPath("app.tables.owner") )
|
||||||
|
config.getString("app.tables.owner")
|
||||||
|
else
|
||||||
|
task.table.owner()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
package ru.sa2.hive.concatenator.core
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
|
class PartitionTask(val table: TableInfo, val path: Path, val query: String, val partition: String) {
|
||||||
|
|
||||||
|
def generateJobName(): String = {
|
||||||
|
s"Compressing ${table.name()} $partition"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package ru.sa2.hive.concatenator.core
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
|
class PartitionTaskGenerator(table: TableInfo) {
|
||||||
|
|
||||||
|
def generatePartitionTasks(): List[PartitionTask] = {
|
||||||
|
table.listPartitions().map(partition =>
|
||||||
|
new PartitionTask(
|
||||||
|
table,
|
||||||
|
new Path(s"${table.rootLocation()}/$partition"),
|
||||||
|
generateQuery(partition),
|
||||||
|
partition
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def generatePartitionClause(parts: String): String =
|
||||||
|
parts.split("/").map(_.split("=").head).mkString(", ")
|
||||||
|
|
||||||
|
private def generateWhereClause(parts: String): String =
|
||||||
|
parts.split("/").map(_.split("=")).map(a => s"${a(0)} = '${a(1)}'").mkString(" AND ")
|
||||||
|
|
||||||
|
|
||||||
|
private def generateQuery(partition: String): String =
|
||||||
|
if (table.isPartitioned())
|
||||||
|
s"INSERT OVERWRITE TABLE ${table.name()} PARTITION (${generatePartitionClause(partition)}) SELECT DISTINCT * FROM ${table.name()} WHERE ${generateWhereClause(partition)}"
|
||||||
|
else
|
||||||
|
s"INSERT OVERWRITE TABLE ${table.name()} SELECT DISTINCT * FROM ${table.name()}"
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
25
src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala
Normal file
25
src/main/scala/ru/sa2/hive/concatenator/core/TableInfo.scala
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package ru.sa2.hive.concatenator.core
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
|
||||||
|
import org.apache.hadoop.hive.metastore.api.Table
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
class TableInfo(val table: Table, metastore: HiveMetaStoreClient) {
|
||||||
|
|
||||||
|
|
||||||
|
def name(): String = table.getDbName + "." + table.getTableName
|
||||||
|
|
||||||
|
def isPartitioned(): Boolean = table.getPartitionKeysSize > 0
|
||||||
|
|
||||||
|
def listPartitions() =
|
||||||
|
if (isPartitioned())
|
||||||
|
metastore.listPartitionNames(table.getDbName, table.getTableName, 0).asScala.toList
|
||||||
|
else
|
||||||
|
List("")
|
||||||
|
|
||||||
|
def rootLocation() = table.getSd.getLocation.replaceAll("hdfs://nameservice1", "")
|
||||||
|
|
||||||
|
def owner() = table.getOwner
|
||||||
|
|
||||||
|
}
|
||||||
28
src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala
Normal file
28
src/main/scala/ru/sa2/hive/concatenator/dao/DBService.scala
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package ru.sa2.hive.concatenator.dao
|
||||||
|
|
||||||
|
import java.sql.{Connection, DriverManager}
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import scala.util.Properties
|
||||||
|
|
||||||
|
|
||||||
|
object DBService {
|
||||||
|
private val LOG = LoggerFactory.getLogger(this.getClass)
|
||||||
|
private val config = ConfigFactory.load("application.conf")
|
||||||
|
|
||||||
|
def apply(): DBService = new DBService(connection = getConnection())
|
||||||
|
|
||||||
|
def getConnectionForUser(user: String): HiveClient = {
|
||||||
|
LOG.info(s"Try to open connection for user: $user")
|
||||||
|
HiveClient(DriverManager.getConnection(config.getString("app.db.url"), user, ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getConnection(): Connection = DriverManager.getConnection(config.getString("app.db.url"), config.getString("app.db.user"), "")
|
||||||
|
}
|
||||||
|
|
||||||
|
class DBService(val connection: Connection) {
|
||||||
|
|
||||||
|
def cleanup() = connection.close()
|
||||||
|
}
|
||||||
41
src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala
Normal file
41
src/main/scala/ru/sa2/hive/concatenator/dao/Executor.scala
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package ru.sa2.hive.concatenator.dao
|
||||||
|
|
||||||
|
import java.sql.{Connection, ResultSet}
|
||||||
|
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
|
||||||
|
object Executor {
|
||||||
|
def apply(connection: Connection): Executor = new Executor(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
class Executor(connection: Connection) {
|
||||||
|
|
||||||
|
private val LOG = LoggerFactory.getLogger(this.getClass)
|
||||||
|
|
||||||
|
def execUpdate(query: String): Int = {
|
||||||
|
LOG.debug(s"Running query:\t ${query}")
|
||||||
|
val stmt = connection.createStatement()
|
||||||
|
val updated = stmt.executeUpdate(query)
|
||||||
|
LOG.debug("Closing statement")
|
||||||
|
stmt.close()
|
||||||
|
updated
|
||||||
|
}
|
||||||
|
|
||||||
|
def execQuery[T](query: String, f: ResultSet => T ): T = {
|
||||||
|
LOG.debug(s"Running query:\t $query")
|
||||||
|
val stmt = connection.createStatement()
|
||||||
|
val rs = stmt.executeQuery(query)
|
||||||
|
LOG.debug("Calculating result")
|
||||||
|
val value = f(rs)
|
||||||
|
try {
|
||||||
|
LOG.debug("Closing rs and statement")
|
||||||
|
rs.close()
|
||||||
|
stmt.close()
|
||||||
|
} catch {
|
||||||
|
case _: Exception => LOG.warn("Something bad on close resultset or statement")
|
||||||
|
}
|
||||||
|
value
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
19
src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala
Normal file
19
src/main/scala/ru/sa2/hive/concatenator/dao/HiveClient.scala
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package ru.sa2.hive.concatenator.dao
|
||||||
|
|
||||||
|
import java.sql.{Connection, ResultSet}
|
||||||
|
|
||||||
|
|
||||||
|
object HiveClient {
|
||||||
|
def apply(connection: Connection): HiveClient = new HiveClient(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
class HiveClient(connection: Connection) {
|
||||||
|
|
||||||
|
def setProp(key: String, value: String) = execUpdate(s"SET $key=$value")
|
||||||
|
|
||||||
|
def execUpdate(query: String) = Executor(connection).execUpdate(query)
|
||||||
|
|
||||||
|
def execQuery[T](query: String, f: ResultSet => T): T = Executor(connection).execQuery(query, f)
|
||||||
|
|
||||||
|
def close() = connection.close()
|
||||||
|
}
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
package ru.sa2.hive.concatenator.helper
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||||
|
|
||||||
|
|
||||||
|
object HdfsHelper {
|
||||||
|
|
||||||
|
private val config = ConfigFactory.load("application.conf")
|
||||||
|
|
||||||
|
val DEFAULT_BLOCK_SIZE = 256000000.0
|
||||||
|
|
||||||
|
lazy val hdfs = getHDFS()
|
||||||
|
|
||||||
|
def isSufficientFileSize(summ: Long, mean: Double): Boolean = (mean > 200000000 && mean < 300000000) || // TODO: change summ of files length to files count
|
||||||
|
(summ < 256000000 && (mean == summ))
|
||||||
|
|
||||||
|
def sufficientFileSize(path: Path): Boolean = {
|
||||||
|
|
||||||
|
if (hdfs.exists(path) && containFiles(path)) {
|
||||||
|
val lengths = lengthsFilesIn(path)
|
||||||
|
val summSize = lengths.sum
|
||||||
|
val meanSize = summSize / lengths.length
|
||||||
|
|
||||||
|
isSufficientFileSize(summSize, meanSize)
|
||||||
|
}
|
||||||
|
else
|
||||||
|
true
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getHDFS() = {
|
||||||
|
val conf = new Configuration()
|
||||||
|
conf.set("fs.defaultFS", config.getString("app.services.hdfs"))
|
||||||
|
FileSystem.get(conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
def numReduceTasks(path: Path): Int = blocksCount(lengthsFilesIn(path))
|
||||||
|
|
||||||
|
def blocksCount(lengths: Seq[Long]): Int = {
|
||||||
|
math.ceil(lengths.sum / DEFAULT_BLOCK_SIZE).toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
def lengthsFilesIn(path: Path) =
|
||||||
|
if (hdfs.exists(path))
|
||||||
|
hdfs.listStatus(path).filter(_.isFile).filterNot(_.getPath.getName.contains("_SUCCESS")).map(_.getLen)
|
||||||
|
else
|
||||||
|
Array(0L)
|
||||||
|
|
||||||
|
def containFiles(path: Path) = lengthsFilesIn(path).length != 0
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package ru.sa2.hive.concatenator.helper
|
||||||
|
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
import org.apache.hadoop.hive.conf.HiveConf
|
||||||
|
import org.apache.hadoop.hive.metastore.api.LockResponse
|
||||||
|
import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, LockComponentBuilder, LockRequestBuilder}
|
||||||
|
import ru.sa2.hive.concatenator.TablesFilter
|
||||||
|
import ru.sa2.hive.concatenator.core.TableInfo
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
|
||||||
|
class HiveMetastoreHelper(implicit config: Config) {
|
||||||
|
|
||||||
|
private val metastore = getHMC()
|
||||||
|
|
||||||
|
def allDatabases = metastore.getAllDatabases.asScala
|
||||||
|
|
||||||
|
def allTables(tablesFilter: TablesFilter) = allDatabases filter {tablesFilter.validateDB} flatMap getTables filter {tablesFilter.validateTable}
|
||||||
|
|
||||||
|
def getTables(dbName: String): Seq[TableInfo] =
|
||||||
|
metastore.getTableObjectsByName(dbName, metastore.getAllTables(dbName)).asScala
|
||||||
|
.map(table => new TableInfo(table, metastore))
|
||||||
|
|
||||||
|
def lockResponse(table: TableInfo): LockResponse = {
|
||||||
|
val component = new LockComponentBuilder()
|
||||||
|
.setDbName(table.table.getDbName)
|
||||||
|
.setTableName(table.table.getTableName)
|
||||||
|
.setExclusive().build()
|
||||||
|
metastore.lock(new LockRequestBuilder().addLockComponent(component).setUser(table.owner()).build())
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getHMC() = {
|
||||||
|
val conf = new HiveConf()
|
||||||
|
conf.setVar(HiveConf.ConfVars.METASTOREURIS, config.getString("app.services.hive_metastore_uri"))
|
||||||
|
new HiveMetaStoreClient(conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
def getTablePartitions(tableName: String)(implicit hmc: HiveMetaStoreClient): List[Map[String, String]] = {
|
||||||
|
|
||||||
|
val Array(database, table) = tableName.split("\\.") //TODO add table name parser
|
||||||
|
val isPartitioned = hmc.getTable(database, table).getPartitionKeysSize > 0
|
||||||
|
|
||||||
|
require(isPartitioned, s"Table $tableName is not partitioned!")
|
||||||
|
|
||||||
|
hmc.listPartitionNames(database, table, 0).asScala
|
||||||
|
.toList
|
||||||
|
.map(_.split("/")
|
||||||
|
.map(_.split("="))
|
||||||
|
.map(a => a(0) -> a(1)).toMap
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
20
src/test/resources/testconfig.conf
Normal file
20
src/test/resources/testconfig.conf
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
app {
|
||||||
|
parallel {
|
||||||
|
tasks = "10"
|
||||||
|
}
|
||||||
|
tables {
|
||||||
|
filter = "some_hive_db.*\\..*"
|
||||||
|
}
|
||||||
|
db {
|
||||||
|
driver = "org.apache.hive.jdbc.HiveDriver"
|
||||||
|
url = "jdbc:hive2://hive_metastore:10000/default"
|
||||||
|
user = "default"
|
||||||
|
user = ${?USER}
|
||||||
|
}
|
||||||
|
services {
|
||||||
|
hive_metastore_uri = "thrift://hive_metastore:9083"
|
||||||
|
hdfs = "hdfs://nameservice1:8020"
|
||||||
|
}
|
||||||
|
hiveopts = [ "hive.output.file.extension=.snappy.parquet" ]
|
||||||
|
|
||||||
|
}
|
||||||
33
src/test/scala/ConfigTest.scala
Normal file
33
src/test/scala/ConfigTest.scala
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package ololo
|
||||||
|
|
||||||
|
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
object ConfigTest extends App {
|
||||||
|
|
||||||
|
val conf = ConfigFactory.load("testconfig.conf")
|
||||||
|
|
||||||
|
// println(conf.getConfig("app.hiveopts"))
|
||||||
|
|
||||||
|
println(conf.getStringList("app.hiveopts"))
|
||||||
|
|
||||||
|
println()
|
||||||
|
|
||||||
|
conf.getStringList("app.hiveopts").asScala
|
||||||
|
.map{ param =>
|
||||||
|
|
||||||
|
val eq = param.indexOf("=")
|
||||||
|
|
||||||
|
println(param)
|
||||||
|
println(param.take(eq))
|
||||||
|
println(param.drop(eq + 1))
|
||||||
|
|
||||||
|
param.take(eq) -> param.takeRight(eq)
|
||||||
|
|
||||||
|
}
|
||||||
|
.foreach(println)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
41
src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala
Normal file
41
src/test/scala/ru/sa2/hive/concatenator/HdfsHelperTest.scala
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import org.scalatest.{FunSpec, Matchers}
|
||||||
|
import ru.sa2.hive.concatenator.helper.HdfsHelper
|
||||||
|
|
||||||
|
class HdfsHelperTest extends FunSpec with Matchers {
|
||||||
|
|
||||||
|
describe("HdfsHelperTest") {
|
||||||
|
|
||||||
|
it("should isEnoughFileSize") {
|
||||||
|
|
||||||
|
HdfsHelper.isSufficientFileSize(800000000, 210000000) shouldBe true
|
||||||
|
HdfsHelper.isSufficientFileSize(200000000, 200000000) shouldBe true
|
||||||
|
HdfsHelper.isSufficientFileSize(26700000, 26700000) shouldBe true
|
||||||
|
|
||||||
|
HdfsHelper.isSufficientFileSize(800000000, 800000000) shouldBe false
|
||||||
|
HdfsHelper.isSufficientFileSize(20000000, 7000000) shouldBe false
|
||||||
|
|
||||||
|
HdfsHelper.isSufficientFileSize(200000000, 70000000) shouldBe false
|
||||||
|
|
||||||
|
HdfsHelper.isSufficientFileSize(800000000, 150000000) shouldBe false
|
||||||
|
HdfsHelper.isSufficientFileSize(800000000, 400000000) shouldBe false
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should return correct number reducers") {
|
||||||
|
HdfsHelper.blocksCount(Seq(200000000)) shouldBe 1
|
||||||
|
HdfsHelper.blocksCount(Seq(280000000)) shouldBe 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// it("should generatePartitionClause") {
|
||||||
|
//// Main.generatePartitionClause("month=201707/first_id=38/second_id=7440") shouldBe "month = '201707', first_id = '38', second_id = '7440'"
|
||||||
|
// Main.generatePartitionClause("month=201707/first_id=38/second_id=7440") shouldBe "month, first_id, second_id"
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// it("should generateWhereClause") {
|
||||||
|
// Main.generateWhereClause("month=201707/first_id=38/second_id=7440") shouldBe "month = '201707' AND first_id = '38' AND second_id = '7440'"
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
package ru.sa2.hive.concatenator
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
|
||||||
|
import org.apache.hadoop.hive.metastore.api.Table
|
||||||
|
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSpec, Matchers}
|
||||||
|
import org.mockito.Mockito._
|
||||||
|
import org.scalatest.mockito.MockitoSugar
|
||||||
|
import ru.sa2.hive.concatenator.core.TableInfo
|
||||||
|
|
||||||
|
|
||||||
|
class TablesFilterTest extends FunSpec with Matchers with MockitoSugar {
|
||||||
|
|
||||||
|
val tablesFilter = new TablesFilter("tmp_.*\\..*ololo")
|
||||||
|
|
||||||
|
describe("TablesFilterTest") {
|
||||||
|
|
||||||
|
it("correct filtering databases") {
|
||||||
|
Set("tmp_astrn", "tmp_ololo", "asdad", "asdasdasd", "tmpsdjhfsjdfh")
|
||||||
|
.filter(tablesFilter.validateDB) shouldBe Set("tmp_astrn", "tmp_ololo")
|
||||||
|
}
|
||||||
|
|
||||||
|
it("correct filtering tables") {
|
||||||
|
val input = Seq("tmp_astrn.sdfsdf", "tmp_ololo.sdd", "tmp_ololo.ololo", "tmp_ololo.sddololo")
|
||||||
|
val answer = Seq("tmp_ololo.ololo", "tmp_ololo.sddololo")
|
||||||
|
|
||||||
|
validateTables(input, tablesFilter) shouldBe answer
|
||||||
|
}
|
||||||
|
|
||||||
|
it("correct filtering tables max") {
|
||||||
|
val filter = new TablesFilter("some_db.?\\.(this.*|.*or_this.*)")
|
||||||
|
val input = Seq("some_db.thisx_one", "some_db.this_two", "some_db.or_this", "some_db.qqqor_this", "some_db.qqqor_thisqqq",
|
||||||
|
"some_db.aaa", "some_db.bbb", "some_db.ololo")
|
||||||
|
val answer = Seq("some_db.thisx_one", "some_db.this_two", "some_db.or_this", "some_db.qqqor_this", "some_db.qqqor_thisqqq")
|
||||||
|
|
||||||
|
validateTables(input, filter) shouldBe answer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def validateTables(tables: Seq[String], filter: TablesFilter): Seq[String] = tables
|
||||||
|
.map(name => mockTableInfo(name))
|
||||||
|
.filter(filter.validateTable)
|
||||||
|
.map(_.name())
|
||||||
|
|
||||||
|
|
||||||
|
def mockTableInfo(answer: String) = {
|
||||||
|
val ti = mock[TableInfo]
|
||||||
|
when(ti.name()).thenReturn(answer)
|
||||||
|
ti
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user