Login

Register

Login

Register

✆+91-9916812177 | contact@beingdatum.com

How to Process Nasty Fixed Width Files Using Apache Spark

A fixed width file is a very common flat-file format when working with SAP, Mainframe, and Web Logs. Converting the data into a data frame using metadata is always a challenge for Spark Developers. This particular article talks about all kinds of typical scenarios that a developer might face while working with a fixed-width file. This solution is generic to any fixed-width file and very easy to implement. This also takes care of the Tail Safe Stack as the RDD gets into the foldLeft operator.

Let’s check the source file first and then the metadata file:
====================================
10597 Subhasish Guha subhasish.iot@gmail.com 27 TXNPUES Yes 5007.10
191675 Ritam Mukherjee ritam@gmail.com 29 SINGLE OUNHQEX XUFQONY No 01/14/20133172.53 43537.00
Saurav Agarwal agarwalsaurav@gmail.com 30 MARRIED 100000.00 7000.00
80495 Priyanshu Kumar kumarpriyanshu@gmail.com 45 MARRIED XDZANTV No 11/21/2012
====================================
The source file has multiple issues:

1. The end field does not have all the spaces.
2. The last two fields are not present.
3. In between fields, a few things are not present.
The metadata file looks like below:
=======================
col_name,size
account_id,10
name,50
mail,40
age,3
status,10
desc,27
cflow,3
process_date,10
debit_amt,10
credit_amt,10
=======================
Now please look at the generic code which could load the data in a dataframe:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{lit,col,concat_ws,split,struct}
import org.apache.spark.sql.catalyst.expressions.Explode
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.{lower, upper}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{input_file_name, regexp_extract}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.reflect.ClassTag
object FixedWidth {
def lsplit(pos: List[Int], str: String): Row = {
val (rest, result) = pos.foldLeft((str, List[String]())) {
case ((s, res),curr) =>
if(s.length()curr)
{
val split=s.substring(0, curr).trim()
val rest=s.substring(curr)
(rest, split :: res)
}
else
{
val split=””
val rest=””
(rest, split :: res)
}
}
Row.fromSeq(result.reverse)
}
def main(args: Array[String]): Unit = {
System.setProperty(“hadoop.home.dir”, “C://winutils//”)
val spark = SparkSession.builder().master(“yarn”).config(“spark.sql.warehouse.dir”, “file:///C://Personalized//”).getOrCreate()
val rdd= spark.sparkContext.textFile(“path of your file”)
val metadata=spark.read.option(“header”, “true”).csv(“path of your Metadata File”)
val header=metadata.select(“col_name”).rdd.map(x=>x.getString(0).trim()).collect()
val sizeOfColumn=metadata.select(“size”).rdd.map(x=>x.getString(0).trim()).collect().map(_.toInt).toList
val fields = header.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val df=spark.createDataFrame(rdd.map { x => lsplit(sizeOfColumn,x) }, schema)
df.show(false)
}
}
=============================================

The output of this code looks like what I’ve got below. I hope this helps all the developers who are handling this kind of file and facing some problems. This particular code will handle almost all possible discrepancies that we face.
=============================================
+———-+—————+————————+—+——-+—————+—–+————+———+———-+
|account_id|name |mail |age|status |desc |cflow|process_date|debit_amt|credit_amt|
+———-+—————+————————+—+——-+—————+—–+————+———+———-+
|10597 |Subhasish Guha |subhasish.iot@gmail.com |27 | |TXNPUES |Yes | |5007.10 | |
|191675 |Ritam Mukherjee|ritam@gmail.com |29 |SINGLE |OUNHQEX XUFQONY|No |01/14/2013 |3172.53 |43537.00 |
| |Saurav Agarwal |agarwalsaurav@gmail.com |30 |MARRIED| | | |100000.00|7000.00 |
|80495 |Priyanshu Kumar|kumarpriyanshu@gmail.com|45 |MARRIED|XDZANTV |No |11/21/2012 | | |
+———-+—————+————————+—+——-+—————+—–+————+———+———-+

January 16, 2020

0 responses on "How to Process Nasty Fixed Width Files Using Apache Spark"

    Leave a Message

    Your email address will not be published. Required fields are marked *

    © BeingDatum. All rights reserved.
    X