0

I have a relatively large data set that has records of Jobs being submitted to a queue. [part of] these records have start and end of the job's runtime, and how much (as in here nnodes) resources they have consumed.

What I can do is to create a new row for each day of each job. What I wanna do is to also get how many hours for each of those days this job was using those resources.

+-------+-------------------+-------------------+------+----------+
| job_id|              start|                end|nnodes| job's day|
+-------+-------------------+-------------------+------+----------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18|  1676|2015-01-04|
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18|  1676|2015-01-05|
+-------+-------------------+-------------------+------+----------+

This seems like it should be a simple task but what I need to do is get the number of days for each job. If one the answer is a simple reduction but for any larger number is not that simple. I wonder if there is a typical solution for this. As one should assume, not all months are equal.

p.s. I wanna emphasize that I need hours or minutes for each record. So the first line should be:

+-------+-------------------+-------------------+------+----------+--------+
| job_id|              start|                end|nnodes| job's day| minutes|
+-------+-------------------+-------------------+------+----------+--------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18|  1676|2015-01-04|      46|
|2747673|2015-01-04 23:44:03|2015-01-05 00:13:18|  1676|2015-01-05|      13|
+-------+-------------------+-------------------+------+----------+--------+

and so on.

1

This is pretty simple to do by just converting the dates and times to unix_timestamp and taking one from the other. This will give you the difference in seconds to to get it in minutes just divide by 60 and to get in hours just divide by 3600.

Ive added the logic I think you need in order to derive the time usage for each given day.

 val df = Seq(
     (2747673, "2015-01-04 23:44:03", "2015-01-05 00:00:18",  1676, "2015-01-04"),
     (2747673, "2015-01-04 23:44:03", "2015-01-05 00:00:18",  1676, "2015-01-05")
    ).toDF("job_id", "start", "end", "nnodes", "job's day")

+-------+-------------------+-------------------+------+----------+
| job_id|              start|                end|nnodes| job's day|
+-------+-------------------+-------------------+------+----------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-04|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-05|
+-------+-------------------+-------------------+------+----------+

val timeUsage = when(unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss") < unix_timestamp(col("job's day"), "yyyy-MM-dd") && 
                     unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") > unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd"), lit(86,400))
                .when(unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss") < unix_timestamp(col("job's day"), "yyyy-MM-dd"), 
                      unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") - unix_timestamp(col("job's day"), "yyyy-MM-dd"))
                .when(unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") > unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd"), 
                      unix_timestamp(date_add(col("job's day"), 1), "yyyy-MM-dd") - unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss"))
                .otherwise(unix_timestamp(col("end"), "yyyy-MM-dd HH:mm:ss") - unix_timestamp(col("start"), "yyyy-MM-dd HH:mm:ss"))

df.withColumn("difference_in_seconds", timeUsage).show

+-------+-------------------+-------------------+------+----------+---------------------+
| job_id|              start|                end|nnodes| job's day|difference_in_seconds|
+-------+-------------------+-------------------+------+----------+---------------------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-04|                  957|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-05|                   18|
+-------+-------------------+-------------------+------+----------+---------------------+


df.withColumn("difference_in_minutes", timeUsage/60).show

+-------+-------------------+-------------------+------+----------+---------------------+
| job_id|              start|                end|nnodes| job's day|difference_in_minutes|
+-------+-------------------+-------------------+------+----------+---------------------+
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-04|                15.95|
|2747673|2015-01-04 23:44:03|2015-01-05 00:00:18|  1676|2015-01-05|                  0.3|
+-------+-------------------+-------------------+------+----------+---------------------+                    
  • thank you for the answer, but it is not what I meant. I have updated the question. I am looking for each minute of that the job is taking in minutes or hours (won't matter much). so the first day the diff_in_minutes is right but for the second one no. Since this job's duration can be as longs as two years I need the function whici is point the minutes to be as simple as possible. – M.Rez Mar 12 at 10:19
  • ive updated my answer to what i think you are after? – randal25 Mar 12 at 11:47
  • You answered my confusion. Actually, I fixed it myself exactly like you and I just had to add the template for the date which I was not doing and getting the wrong answer. I thought I should parse the string. Thank you very much. I was away from scala/spark for a couple of months. – M.Rez Mar 12 at 11:54

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.